Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instrument spring scheduling @Async #1873

Merged
merged 6 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public boolean matches(final T target) {
}

if (name.startsWith("org.springframework.")) {
if (name.startsWith("org.springframework.aop.")
if ((name.startsWith("org.springframework.aop.")
&& !name.equals("org.springframework.aop.interceptor.AsyncExecutionInterceptor"))
|| name.startsWith("org.springframework.cache.")
|| name.startsWith("org.springframework.dao.")
|| name.startsWith("org.springframework.ejb.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ testSets {
}

dependencies {
// 3.2.3 is the first version with which the tests will run. Lower versions require other
// classes and packages to be imported. Versions 3.1.0+ work with the instrumentation.
compileOnly group: 'org.springframework', name: 'spring-context', version: '3.1.0.RELEASE'
testCompile group: 'org.springframework', name: 'spring-context', version: '3.2.3.RELEASE'
// choose a recent version so that we can test both lambdas (JDK8)
// @Async requires proxying and older versions can't read classfile versions > 51
// we muzzle older versions of spring anyway
compileOnly group: 'org.springframework', name: 'spring-context', version: '5.0.0.RELEASE'
testCompile group: 'org.springframework', name: 'spring-context', version: '5.0.0.RELEASE'

// for a test case with CompletableFuture
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')

// this is the latest version that supports Java 7
richardstartin marked this conversation as resolved.
Show resolved Hide resolved
latestDepTestCompile group: 'org.springframework', name: 'spring-context', version: '4.+'
latestDepTestCompile group: 'org.springframework', name: 'spring-context', version: '5.+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package datadog.trace.instrumentation.springscheduling;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.springscheduling.SpringSchedulingDecorator.DECORATE;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Method;
import org.aopalliance.intercept.MethodInvocation;

public class SpannedMethodInvocation implements MethodInvocation {

private final AgentSpan parent;
private final MethodInvocation delegate;

public SpannedMethodInvocation(AgentSpan parent, MethodInvocation delegate) {
this.parent = parent;
this.delegate = delegate;
}

@Override
public Method getMethod() {
return delegate.getMethod();
}

@Override
public Object[] getArguments() {
return delegate.getArguments();
}

@Override
public Object proceed() throws Throwable {
CharSequence spanName = DECORATE.spanNameForMethod(delegate.getMethod());
final AgentSpan span =
richardstartin marked this conversation as resolved.
Show resolved Hide resolved
parent == null ? startSpan(spanName) : startSpan(spanName, parent.context());
try (AgentScope scope = activateSpan(span)) {
// question: is this necessary? What does it do?
// if the delegate does async work is everything OK because of this?
// if the delegate does async work, should I need to worry about it here?
scope.setAsyncPropagation(true);
Comment on lines +39 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will hopefully be unnecessary with the planned changes to PendingTrace.

return delegate.proceed();
} finally {
// question: Why can't this just be AutoCloseable? Dogma?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a fair bit of history here, but I can't find any original sources. I think these summarize the problem well:
opentracing/opentracing-java#361 (comment)
https://github.com/Nike-Inc/wingtips#warning-about-error-handling-when-using-try-with-resources-to-autoclose-spans

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That reads like an issue with programming discipline, and sacrifices convenience in the common case for the inability to be incorrect in the rare case. Just my opinion.

span.finish();
}
}

@Override
public Object getThis() {
return delegate.getThis();
}

@Override
public AccessibleObject getStaticPart() {
return delegate.getStaticPart();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package datadog.trace.instrumentation.springscheduling;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;

import net.bytebuddy.asm.Advice;
import org.aopalliance.intercept.MethodInvocation;

public class SpringAsyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void scheduleAsync(
@Advice.Argument(value = 0, readOnly = false) MethodInvocation invocation) {
invocation = new SpannedMethodInvocation(activeSpan(), invocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thing to mention is that currently this will result in the span likely being reported as a separate trace since you're not using continuations to force the trace to stick around.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is fine modeling wise because it's timing does not matter for the critical path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by saying it is a separate trace, isn't it just spans sent separately? That's what I think it should be.

I think today. These spans might go to PendingTrace that is thought to be done, but I think that's a problem with PendingTrace.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dougqh to clarify, I mean it would likely be reported independently from the rest of the trace.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like the wrong API to me. I should need to call AgentSpan.finish() to finish the span. DDSpanContext has a PendingTrace - so why shouldn't AgentSpan.finish() be enough to hold the PendingTrace back?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right now, an unfinished span holds back sending the trace leg.
But that's leading to problems where an unfinished span, holds back the whole trace.

I think we want to get away from that.

My thought is that a span is published in the first payload when it finishes before the root span.
(Except for long root spans for batch jobs where we need to send sooner).

Otherwise, the span is deemed to have finished late. If it makes it with a grace period, it can be included. Otherwise, it goes separately. But basically, it is only guarantee if it finishes before the root.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package datadog.trace.instrumentation.springscheduling;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;

@AutoService(Instrumenter.class)
public class SpringAsyncInstrumentation extends Instrumenter.Default {

public SpringAsyncInstrumentation() {
super("spring-async");
}

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("org.springframework.aop.interceptor.AsyncExecutionInterceptor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about adding instrumentation for a class that seems relatively unrelated to scheduling in the module for spring-scheduling. Is @Async used for anything besides spring scheduling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This intercepts the annotation (notice the package) org.springframework.scheduling.annotation.Async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this seems pretty clearly related to Async to me. Although, I'll admit their package naming makes me a little concern that maybe AsyncExecutionInterceptor does more.

But presuming AsyncExecutionInterceptor is specific to @Async annotations, I think this is probably the right place.
It solves my general aim which to let the frameworks find the target methods for us -- rather than us seeking them out.

}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SpannedMethodInvocation", packageName + ".SpringSchedulingDecorator"
};
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return Collections.singletonMap(
isMethod()
.and(
named("invoke")
.and(
ElementMatchers.takesArgument(
0, named("org.aopalliance.intercept.MethodInvocation")))),
packageName + ".SpringAsyncAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import datadog.trace.agent.test.AgentTestRunner
import org.springframework.context.annotation.AnnotationConfigApplicationContext

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace

class SpringAsyncTest extends AgentTestRunner {

def "context propagated through @async annotation" () {
setup:
def context = new AnnotationConfigApplicationContext(AsyncTaskConfig)
AsyncTask asyncTask = context.getBean(AsyncTask)
when:
runUnderTrace("root") {
asyncTask.async().join()
}
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
resourceName "root"
}
span(1) {
resourceName "AsyncTask.async"
threadNameStartsWith "SimpleAsyncTaskExecutor"
childOf span(0)
}
span(2) {
resourceName "AsyncTask.getInt"
threadNameStartsWith "SimpleAsyncTaskExecutor"
childOf span(1)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class SpringSchedulingTest extends AgentTestRunner {
}
}
}
cleanup: context.close()
}

def "schedule interval test"() {
Expand All @@ -54,7 +55,7 @@ class SpringSchedulingTest extends AgentTestRunner {
}
}
}

cleanup: context.close()
}

def "schedule lambda test"() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import datadog.trace.api.Trace;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.springframework.scheduling.annotation.Async;

public class AsyncTask {

@Async
public CompletableFuture<Integer> async() {
return CompletableFuture.completedFuture(getInt());
}

@Trace
public int getInt() {
return ThreadLocalRandom.current().nextInt();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
@EnableAsync
public class AsyncTaskConfig {

@Bean
AsyncTask asyncTask() {
return new AsyncTask();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class SpanAssert {
traceDDId(parent.traceId)
}

def threadNameStartsWith(String threadName) {
assert span.tags.get("thread.name")?.startsWith(threadName)
}

def notChildOf(DDSpan parent) {
assert parent.spanId != span.parentId
assert parent.traceId != span.traceId
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datadog.smoketest.springboot;

import datadog.smoketest.springboot.grpc.AsynchronousGreeter;
import java.util.concurrent.CompletableFuture;
import org.springframework.scheduling.annotation.Async;

public class AsyncTask {

private final AsynchronousGreeter greeter;

public AsyncTask(AsynchronousGreeter greeter) {
this.greeter = greeter;
}

@Async
public CompletableFuture<String> greet() {
return CompletableFuture.completedFuture(greeter.greet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package datadog.smoketest.springboot;

import datadog.smoketest.springboot.grpc.AsynchronousGreeter;
import datadog.smoketest.springboot.grpc.LocalInterface;
import datadog.smoketest.springboot.grpc.SynchronousGreeter;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableAsync
@EnableScheduling
public class SpringbootGrpcApplication {

@Bean
AsyncTask asyncTask(AsynchronousGreeter greeter) {
return new AsyncTask(greeter);
}

@Bean
AsynchronousGreeter asynchronousGreeter(LocalInterface localInterface) {
return new AsynchronousGreeter(localInterface.getPort());
}

@Bean
SynchronousGreeter synchronousGreeter(LocalInterface localInterface) {
return new SynchronousGreeter(localInterface.getPort());
}

@Bean
LocalInterface localInterface() throws IOException {
return new LocalInterface();
}

public static void main(final String[] args) {
ConfigurableApplicationContext app =
SpringApplication.run(SpringbootGrpcApplication.class, args);
Integer port = app.getBean("local.server.port", Integer.class);
System.out.println(
"Bound to " + port + " in " + ManagementFactory.getRuntimeMXBean().getUptime() + "ms");
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package datadog.smoketest.springboot.controller;

import datadog.smoketest.springboot.AsyncTask;
import datadog.smoketest.springboot.grpc.AsynchronousGreeter;
import datadog.smoketest.springboot.grpc.LocalInterface;
import datadog.smoketest.springboot.grpc.SynchronousGreeter;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -16,18 +15,19 @@

@Slf4j
@RestController
public class WebController implements AutoCloseable {
public class WebController {

private final ExecutorService pool = Executors.newFixedThreadPool(5);

private final AsynchronousGreeter asyncGreeter;
private final SynchronousGreeter greeter;
private final LocalInterface localInterface;
private final AsyncTask asyncTask;

public WebController() throws IOException {
this.localInterface = new LocalInterface();
this.asyncGreeter = new AsynchronousGreeter(localInterface.getPort());
this.greeter = new SynchronousGreeter(localInterface.getPort());
public WebController(
AsynchronousGreeter asyncGreeter, SynchronousGreeter greeter, AsyncTask asyncTask) {
this.asyncGreeter = asyncGreeter;
this.greeter = greeter;
this.asyncTask = asyncTask;
}

@RequestMapping("/greeting")
Expand Down Expand Up @@ -74,10 +74,8 @@ public String call() {
return response;
}

@Override
public void close() {
localInterface.close();
greeter.close();
asyncGreeter.close();
@RequestMapping("async_annotation_greeting")
public String asyncAnnotationGreeting() {
return asyncTask.greet().join();
}
}
Loading