Skip to content

Commit

Permalink
Add trace context propagation in atomic steps
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrille-leclerc authored Jul 11, 2024
2 parents b310419 + 2941b44 commit 2eededb
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright The Original Author or Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.jenkins.plugins.opentelemetry.init;

import hudson.Extension;
import hudson.util.ClassLoaderSanityThreadFactory;
import hudson.util.DaemonThreadFactory;
import hudson.util.NamingThreadFactory;
import io.jenkins.plugins.opentelemetry.api.OpenTelemetryLifecycleListener;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import org.jenkinsci.plugins.workflow.steps.SynchronousNonBlockingStepExecution;

import javax.annotation.Nonnull;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

@Extension
public class StepExecutionInstrumentationInitializer implements OpenTelemetryLifecycleListener {

final static Logger logger = Logger.getLogger(StepExecutionInstrumentationInitializer.class.getName());

@Override
public void afterConfiguration(@Nonnull ConfigProperties configProperties) {
try {
logger.log(Level.FINE, () -> "Instrumenting " + SynchronousNonBlockingStepExecution.class.getName() + "...");
Class<SynchronousNonBlockingStepExecution> synchronousNonBlockingStepExecutionClass = SynchronousNonBlockingStepExecution.class;
Arrays.stream(synchronousNonBlockingStepExecutionClass.getDeclaredFields()).forEach(field -> logger.log(Level.FINE, () -> "Field: " + field.getName()));
Field executorServiceField = synchronousNonBlockingStepExecutionClass.getDeclaredField("executorService");
executorServiceField.setAccessible(true);
ExecutorService executorService = (ExecutorService) Optional.ofNullable(executorServiceField.get(null)).orElseGet(() -> Executors.newCachedThreadPool(new NamingThreadFactory(new ClassLoaderSanityThreadFactory(new DaemonThreadFactory()), "org.jenkinsci.plugins.workflow.steps.SynchronousNonBlockingStepExecution")));
ExecutorService instrumentedExecutorService = Context.taskWrapping(executorService);
executorServiceField.set(null, instrumentedExecutorService);

// org.jenkinsci.plugins.workflow.cps.CpsThreadGroup.runner
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);

Check warning on line 45 in src/main/java/io/jenkins/plugins/opentelemetry/init/StepExecutionInstrumentationInitializer.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered lines

Lines 44-45 are not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class MonitoringAction extends AbstractMonitoringAction implements Action
private transient Run run;

public MonitoringAction(Span span) {
super(span);
super(span, Collections.emptyList());
this.rootSpanName = super.getSpanName();
this.rootContext = super.getW3cTraceContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,38 +200,37 @@ public void onAtomicStep(@NonNull StepAtomNode node, @NonNull WorkflowRun run) {
LOGGER.log(Level.FINE, () -> run.getFullDisplayName() + " - don't create span for step '" + node.getDisplayFunctionName() + "'");
return;
}
try (Scope ignored = setupContext(run, node)) {
verifyNotNull(ignored, "%s - No span found for node %s", run, node);
Scope encapsulatingNodeScope = setupContext(run, node);

String principal = Objects.toString(node.getExecution().getAuthentication().getPrincipal(), "#null#");
LOGGER.log(Level.FINE, () -> node.getDisplayFunctionName() + " - principal: " + principal);
verifyNotNull(encapsulatingNodeScope, "%s - No span found for node %s", run, node);

StepHandler stepHandler = getStepHandlers().stream().filter(sh -> sh.canCreateSpanBuilder(node, run)).findFirst()
.orElseThrow((Supplier<RuntimeException>) () ->
new IllegalStateException("No StepHandler found for node " + node.getClass() + " - " + node + " on " + run));
SpanBuilder spanBuilder = stepHandler.createSpanBuilder(node, run, getTracer());
String principal = Objects.toString(node.getExecution().getAuthentication().getPrincipal(), "#null#");

String stepType = getStepType(node, node.getDescriptor(), JenkinsOtelSemanticAttributes.STEP_NAME);
JenkinsOpenTelemetryPluginConfiguration.StepPlugin stepPlugin = JenkinsOpenTelemetryPluginConfiguration.get().findStepPluginOrDefault(stepType, node);
StepHandler stepHandler = getStepHandlers().stream().filter(sh -> sh.canCreateSpanBuilder(node, run)).findFirst()
.orElseThrow((Supplier<RuntimeException>) () ->
new IllegalStateException("No StepHandler found for node " + node.getClass() + " - " + node + " on " + run));
SpanBuilder spanBuilder = stepHandler.createSpanBuilder(node, run, getTracer());

spanBuilder
.setParent(Context.current()) // TODO can we remove this call?
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_TYPE, stepType)
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_ID, node.getId())
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_NAME, getStepName(node, JenkinsOtelSemanticAttributes.STEP_NAME))
.setAttribute(JenkinsOtelSemanticAttributes.CI_PIPELINE_RUN_USER, principal)
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_PLUGIN_NAME, stepPlugin.getName())
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_PLUGIN_VERSION, stepPlugin.getVersion());
String stepType = getStepType(node, node.getDescriptor(), JenkinsOtelSemanticAttributes.STEP_NAME);
JenkinsOpenTelemetryPluginConfiguration.StepPlugin stepPlugin = JenkinsOpenTelemetryPluginConfiguration.get().findStepPluginOrDefault(stepType, node);

Span atomicStepSpan = spanBuilder.startSpan();
LOGGER.log(Level.FINE, () -> run.getFullDisplayName() + " - > " + node.getDisplayFunctionName() + " - begin " + OtelUtils.toDebugString(atomicStepSpan));
try (Scope ignored2 = atomicStepSpan.makeCurrent()) {
stepHandler.afterSpanCreated(node, run);
}
getTracerService().putSpan(run, atomicStepSpan, node);
}
spanBuilder
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_TYPE, stepType)
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_ID, node.getId())
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_NAME, getStepName(node, JenkinsOtelSemanticAttributes.STEP_NAME))
.setAttribute(JenkinsOtelSemanticAttributes.CI_PIPELINE_RUN_USER, principal)
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_PLUGIN_NAME, stepPlugin.getName())
.setAttribute(JenkinsOtelSemanticAttributes.JENKINS_STEP_PLUGIN_VERSION, stepPlugin.getVersion());

Span atomicStepSpan = spanBuilder.startSpan();
LOGGER.log(Level.FINE, () -> run.getFullDisplayName() + " - > " + node.getDisplayFunctionName() + " - begin " + OtelUtils.toDebugString(atomicStepSpan));
Scope atomicStepScope = atomicStepSpan.makeCurrent();
stepHandler.afterSpanCreated(node, run);

getTracerService().putSpanAndScopes(run, atomicStepSpan, node, Arrays.asList(encapsulatingNodeScope, atomicStepScope));
}


@Override
public void onAfterAtomicStep(@NonNull StepAtomNode node, FlowNode nextNode, @NonNull WorkflowRun run) {
if (isIgnoredStep(node.getDescriptor())){
Expand Down Expand Up @@ -371,7 +370,7 @@ private void endCurrentSpan(FlowNode node, WorkflowRun run, GenericStatus status
span.end();
LOGGER.log(Level.FINE, () -> run.getFullDisplayName() + " - < " + node.getDisplayFunctionName() + " - end " + OtelUtils.toDebugString(span));

getTracerService().removePipelineStepSpan(run, node, span);
getTracerService().removePipelineStepSpanAndCloseAssociatedScopes(run, node, span);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.jenkins.plugins.opentelemetry.semconv.JenkinsOtelSemanticAttributes;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.jenkinsci.plugins.workflow.cps.nodes.StepEndNode;
import org.jenkinsci.plugins.workflow.cps.nodes.StepStartNode;
import org.jenkinsci.plugins.workflow.flow.FlowExecution;
Expand Down Expand Up @@ -155,7 +156,7 @@ private Iterable<FlowNode> getAncestors(@NonNull final FlowNode flowNode) {
return ancestors;
}

public void removePipelineStepSpan(@NonNull WorkflowRun run, @NonNull FlowNode flowNode, @NonNull Span span) {
public void removePipelineStepSpanAndCloseAssociatedScopes(@NonNull WorkflowRun run, @NonNull FlowNode flowNode, @NonNull Span span) {
FlowNode startSpanNode;
if (flowNode instanceof AtomNode) {
startSpanNode = flowNode;
Expand Down Expand Up @@ -183,7 +184,7 @@ public void removePipelineStepSpan(@NonNull WorkflowRun run, @NonNull FlowNode f
.filter(flowNodeMonitoringAction -> Objects.equals(flowNodeMonitoringAction.getSpanId(), span.getSpanContext().getSpanId()))
.findFirst()
.ifPresentOrElse(
FlowNodeMonitoringAction::purgeSpan,
FlowNodeMonitoringAction::purgeSpanAndCloseAssociatedScopes,
() -> {
String msg = "span not found to be purged: " + OtelUtils.toDebugString(span) +
" ending " + OtelUtils.toDebugString(startSpanNode) + " in " + run;
Expand All @@ -204,20 +205,20 @@ public void removeBuildStepSpan(@NonNull AbstractBuild build, @NonNull BuildStep
.reverse()
.stream()
.filter(buildStepMonitoringAction -> Objects.equals(buildStepMonitoringAction.getSpanId(), span.getSpanContext().getSpanId()))
.findFirst().ifPresentOrElse(BuildStepMonitoringAction::purgeSpan, () -> {
.findFirst().ifPresentOrElse(BuildStepMonitoringAction::purgeSpanAndCloseAssociatedScopes, () -> {
throw new IllegalStateException("span not found to be purged: " + span + " for " + buildStep);
});
}

public void purgeRun(@NonNull Run run) {
run.getActions(OtelMonitoringAction.class).forEach(OtelMonitoringAction::purgeSpan);
run.getActions(OtelMonitoringAction.class).forEach(OtelMonitoringAction::purgeSpanAndCloseAssociatedScopes);
// TODO verify we don't need this cleanup

Check warning on line 215 in src/main/java/io/jenkins/plugins/opentelemetry/job/OtelTraceService.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: verify we don't need this cleanup
if (run instanceof WorkflowRun) {
WorkflowRun workflowRun = (WorkflowRun) run;
List<FlowNode> flowNodesHeads = Optional.ofNullable(workflowRun.getExecution()).map(FlowExecution::getCurrentHeads).orElse(Collections.emptyList());
ForkScanner scanner = new ForkScanner();
scanner.setup(flowNodesHeads);
StreamSupport.stream(scanner.spliterator(), false).forEach(flowNode -> flowNode.getActions(OtelMonitoringAction.class).forEach(OtelMonitoringAction::purgeSpan));
StreamSupport.stream(scanner.spliterator(), false).forEach(flowNode -> flowNode.getActions(OtelMonitoringAction.class).forEach(OtelMonitoringAction::purgeSpanAndCloseAssociatedScopes));
}
}

Expand Down Expand Up @@ -260,6 +261,14 @@ public void putSpan(@NonNull Run run, @NonNull Span span, @NonNull FlowNode flow
OtelUtils.toDebugString(flowNode) + ", " + OtelUtils.toDebugString(span) + ")");
}

public void putSpanAndScopes(@NonNull Run run, @NonNull Span span, @NonNull FlowNode flowNode, List<Scope> scopes) {
// FYI for agent allocation, we have 2 FlowNodeMonitoringAction to track the agent allocation duration
flowNode.addAction(new FlowNodeMonitoringAction(span, scopes));

LOGGER.log(Level.FINE, () -> "putSpan(" + run.getFullDisplayName() + ", " +
OtelUtils.toDebugString(flowNode) + ", " + OtelUtils.toDebugString(span) + ")");

Check warning on line 269 in src/main/java/io/jenkins/plugins/opentelemetry/job/OtelTraceService.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered line

Line 269 is not covered by tests
}

private void setAttributesToSpan(@NonNull Span span, OpenTelemetryAttributesAction openTelemetryAttributesAction) {
if (openTelemetryAttributesAction == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,19 @@
package io.jenkins.plugins.opentelemetry.job.action;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;

import java.util.Collections;
import java.util.List;

public abstract class AbstractInvisibleMonitoringAction extends AbstractMonitoringAction {

public AbstractInvisibleMonitoringAction(Span span) {
super(span);
super(span, Collections.emptyList());
}

public AbstractInvisibleMonitoringAction(Span span, List<Scope> scopes) {
super(span, scopes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.jenkins.plugins.opentelemetry.job.action;

import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.NonNull;
import hudson.model.Action;
Expand All @@ -17,6 +18,7 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
Expand All @@ -25,14 +27,20 @@
public abstract class AbstractMonitoringAction implements Action, OtelMonitoringAction {
private final static Logger LOGGER = Logger.getLogger(AbstractMonitoringAction.class.getName());

private transient Span span;
transient SpanAndScopes spanAndScopes;


final String traceId;
final String spanId;
protected String spanName;
protected Map<String, String> w3cTraceContext;

public AbstractMonitoringAction(Span span) {
this.span = span;
/**
* @param span span of this action
* @param scopes scope and underlying scopes associated with the span.
*/
public AbstractMonitoringAction(Span span, List<Scope> scopes) {
this.spanAndScopes = new SpanAndScopes(span, scopes, Thread.currentThread().getName());
this.traceId = span.getSpanContext().getTraceId();
this.spanId = span.getSpanContext().getSpanId();
this.spanName = span instanceof ReadWriteSpan ? ((ReadWriteSpan) span).getName() : null; // when tracer is no-op, span is NOT a ReadWriteSpan
Expand All @@ -41,6 +49,8 @@ public AbstractMonitoringAction(Span span) {
W3CTraceContextPropagator.getInstance().inject(Context.current(), w3cTraceContext, (carrier, key, value) -> carrier.put(key, value));
this.w3cTraceContext = w3cTraceContext;
}

LOGGER.log(Level.FINE, () -> "Span " + getSpanName() + ", thread=" + spanAndScopes.scopeStartThreadName + " opened " + spanAndScopes.scopes.size() + " scopes");
}

public String getSpanName() {
Expand All @@ -55,7 +65,7 @@ public Map<String, String> getW3cTraceContext() {
@Override
@CheckForNull
public Span getSpan() {
return span;
return spanAndScopes.span;
}

public String getTraceId() {
Expand All @@ -67,9 +77,14 @@ public String getSpanId() {
}

@Override
public void purgeSpan() {
LOGGER.log(Level.FINE, () -> "Purge span='" + spanName + "', spanId=" + spanId + ", traceId=" + traceId + ": " + (span == null ? "#null#" : "purged"));
this.span = null;
public void purgeSpanAndCloseAssociatedScopes() {
LOGGER.log(Level.FINE, () -> "Purge span='" + spanName + "', spanId=" + spanId + ", traceId=" + traceId + ": " + spanAndScopes);
Optional.ofNullable(spanAndScopes)
.map(spanAndScopes -> spanAndScopes.scopes)
.map(ImmutableList::copyOf)
.map(ImmutableList::reverse)
.ifPresent(scopes -> scopes.forEach(Scope::close));
this.spanAndScopes = null;
}

@Override
Expand All @@ -85,8 +100,40 @@ public String toString() {
public boolean hasEnded() {
return
Optional
.ofNullable(span).map(s -> s instanceof ReadableSpan ? (ReadableSpan) s : null) // cast to ReadableSpan
.ofNullable(spanAndScopes)
.map(sac -> sac.span)
.filter(s -> s instanceof ReadableSpan)
.map(s -> (ReadableSpan) s)
.map(ReadableSpan::hasEnded)
.orElse(true);
}

/**
* Scopes associated with the span and the underlying scopes instantiated to create the span.
* Underlying scopes can be the scope of the underlying wrapping pipeline step (eg a `stage` step).
* Thread name when the scope was opened. Used for debugging, to identify potential leaks.
*/
static class SpanAndScopes {
@NonNull
final Span span;
@NonNull
final List<Scope> scopes;
@NonNull
final String scopeStartThreadName;

public SpanAndScopes(@NonNull Span span, @NonNull List<Scope> scopes, @NonNull String scopeStartThreadName) {
this.span = span;
this.scopes = scopes;
this.scopeStartThreadName = scopeStartThreadName;
}

@Override
public String toString() {
return "SpanAndScopes{" +
"span=" + span +
", scopes=" + scopes.size() +

Check warning on line 134 in src/main/java/io/jenkins/plugins/opentelemetry/job/action/AbstractMonitoringAction.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered lines

Lines 132-134 are not covered by tests
", scopeStartThreadName='" + scopeStartThreadName + '\'' +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@
package io.jenkins.plugins.opentelemetry.job.action;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;

import java.util.List;

/**
* Span reference associate with a {@link org.jenkinsci.plugins.workflow.graph.FlowNode}
*/
public class FlowNodeMonitoringAction extends AbstractInvisibleMonitoringAction {

public FlowNodeMonitoringAction(Span span) {
super(span);
}

public FlowNodeMonitoringAction(Span span, List<Scope> scopes) {
super(span, scopes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface OtelMonitoringAction extends Action {
@CheckForNull
Span getSpan();

void purgeSpan();
void purgeSpanAndCloseAssociatedScopes();

/**
* @return {@code true} if the associated {@link Span} has ended
Expand Down
Loading

0 comments on commit 2eededb

Please sign in to comment.