Skip to content

Commit

Permalink
updating WithExtractedSpanLinks
Browse files Browse the repository at this point in the history
  • Loading branch information
mhlidd committed Oct 31, 2024
1 parent 79e9a24 commit 0aec80a
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 32 deletions.
6 changes: 4 additions & 2 deletions contrib/IBM/sarama.v1/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
// kafka supports headers, so try to extract a span context
carrier := NewConsumerMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
Expand Down Expand Up @@ -298,7 +299,8 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
}
// if there's a span context in the headers, use that as the parent
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
span := tracer.StartSpan(cfg.producerSpanName, opts...)
if version.IsAtLeast(sarama.V0_11_0_0) {
Expand Down
6 changes: 4 additions & 2 deletions contrib/Shopify/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
// kafka supports headers, so try to extract a span context
carrier := NewConsumerMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
next := tracer.StartSpan(cfg.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
Expand Down Expand Up @@ -301,7 +302,8 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
}
// if there's a span context in the headers, use that as the parent
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
span := tracer.StartSpan(cfg.producerSpanName, opts...)
if version.IsAtLeast(sarama.V0_11_0_0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func TraceReceiveFunc(s Subscription, opts ...Option) func(ctx context.Context,
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub),
tracer.ChildOfWithExtractedSpanLinks(parentSpanCtx),
tracer.WithExtractedSpanLinks(parentSpanCtx),
tracer.ChildOf(parentSpanCtx),
}
if cfg.serviceName != "" {
opts = append(opts, tracer.ServiceName(cfg.serviceName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (tr *KafkaTracer) StartConsumeSpan(msg Message) ddtrace.Span {
// kafka supports headers, so try to extract a span context
carrier := MessageCarrier{msg: msg}
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func (tr *KafkaTracer) StartProduceSpan(msg Message) ddtrace.Span {
// if there's a span context in the headers, use that as the parent
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(tr.ctx, tr.producerSpanName, opts...)
// inject the span context so consumers can pick it up
Expand Down
3 changes: 2 additions & 1 deletion contrib/gofiber/fiber.v2/fiber.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func Middleware(opts ...Option) func(c *fiber.Ctx) error {
}
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(h)); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
opts = append(opts, cfg.spanOpts...)
opts = append(opts,
Expand Down
3 changes: 2 additions & 1 deletion contrib/google.golang.org/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func startSpanFromContext(
)
md, _ := metadata.FromIncomingContext(ctx) // nil is ok
if sctx, err := tracer.Extract(grpcutil.MDCarrier(md)); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(sctx))
opts = append(opts, tracer.WithExtractedSpanLinks(sctx))
opts = append(opts, tracer.ChildOf(sctx))
}
return tracer.StartSpanFromContext(ctx, operation, opts...)
}
Expand Down
3 changes: 2 additions & 1 deletion contrib/internal/httptrace/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func StartRequestSpan(r *http.Request, opts ...ddtrace.StartSpanOption) (tracer.
cfg.Tags["http.host"] = r.Host
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil {
tracer.ChildOfWithExtractedSpanLinks(spanctx)(cfg) //TODO: ensure that this line is performing the expected functionality
tracer.WithExtractedSpanLinks(spanctx)(cfg)
tracer.ChildOf(spanctx)(cfg)
}
for k, v := range ipTags {
cfg.Tags[k] = v
Expand Down
3 changes: 2 additions & 1 deletion contrib/segmentio/kafka.go.v0/internal/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (tr *Tracer) StartConsumeSpan(ctx context.Context, msg Message) ddtrace.Spa
// kafka supports headers, so try to extract a span context
carrier := NewMessageCarrier(msg)
if spanctx, err := tracer.Extract(carrier); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}
span, _ := tracer.StartSpanFromContext(ctx, tr.consumerSpanName, opts...)
// reinject the span context so consumers can pick it up
Expand Down
6 changes: 4 additions & 2 deletions contrib/twitchtv/twirp/twirp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func (wc *wrappedClient) Do(req *http.Request) (*http.Response, error) {
opts = append(opts, tracer.Tag(ext.EventSampleRate, wc.cfg.analyticsRate))
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(req.Header)); err == nil {
opts = append(opts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.WithExtractedSpanLinks(spanctx))
opts = append(opts, tracer.ChildOf(spanctx))
}

span, ctx := tracer.StartSpanFromContext(req.Context(), wc.cfg.spanName, opts...)
Expand Down Expand Up @@ -139,7 +140,8 @@ func WrapServer(h http.Handler, opts ...Option) http.Handler {
spanOpts = append(spanOpts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil {
spanOpts = append(spanOpts, tracer.ChildOfWithExtractedSpanLinks(spanctx))
spanOpts = append(spanOpts, tracer.WithExtractedSpanLinks(spanctx))
spanOpts = append(spanOpts, tracer.ChildOf(spanctx))
}
span, ctx := tracer.StartSpanFromContext(r.Context(), "twirp.handler", spanOpts...)
defer span.Finish()
Expand Down
3 changes: 2 additions & 1 deletion contrib/valyala/fasthttp.v1/fasthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func WrapHandler(h fasthttp.RequestHandler, opts ...Option) fasthttp.RequestHand
ReqHeader: &fctx.Request.Header,
}
if sctx, err := tracer.Extract(fcc); err == nil {
spanOpts = append(spanOpts, tracer.ChildOfWithExtractedSpanLinks(sctx))
spanOpts = append(spanOpts, tracer.WithExtractedSpanLinks(sctx))
spanOpts = append(spanOpts, tracer.ChildOf(sctx))
}
span := fasthttptrace.StartSpanFromContext(fctx, "http.request", spanOpts...)
defer span.Finish()
Expand Down
20 changes: 7 additions & 13 deletions ddtrace/tracer/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,26 +85,20 @@ func TestStartSpanFromContextWithSpanLinks(t *testing.T) {
defer stop()
spanLink := ddtrace.SpanLink{TraceID: 789, TraceIDHigh: 0, SpanID: 789, Attributes: map[string]string{"reason": "terminated_context", "context_headers": "datadog"}, Flags: 0}
spanLinkContext := &spanContext{spanID: 789, traceID: traceIDFrom64Bits(789), spanLinks: []ddtrace.SpanLink{spanLink}}
child, ctx := StartSpanFromContext(
child, _ := StartSpanFromContext(
context.Background(),
"http.request",
ChildOfWithExtractedSpanLinks(spanLinkContext),
WithExtractedSpanLinks(spanLinkContext),
ChildOf(spanLinkContext),
)
assert := assert.New(t)

got, ok := child.(*span)
assert.True(ok)
gotctx, ok := SpanFromContext(ctx)
//checking that a span links are added to a child span that is created where span links are passed as an StartSpanOption
childSpan, ok := child.(*span)
assert.True(ok)
assert.Equal(gotctx, got)
_, ok = gotctx.(*traceinternal.NoopSpan)
assert.False(ok)

assert.Equal(uint64(789), got.TraceID)
assert.Equal(uint64(789), got.ParentID)
assert.Equal("http.request", got.Name)
assert.Equal(1, len(got.SpanLinks))
assert.Equal(spanLink, got.SpanLinks[0])
assert.Equal(1, len(childSpan.SpanLinks))
assert.Equal(spanLink, childSpan.SpanLinks[0])
}

func TestStartSpanFromContextRace(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,13 +1209,12 @@ func ChildOf(ctx ddtrace.SpanContext) StartSpanOption {
}
}

func ChildOfWithExtractedSpanLinks(ctx ddtrace.SpanContext) StartSpanOption {
func WithExtractedSpanLinks(ctx ddtrace.SpanContext) StartSpanOption {
return func(cfg *ddtrace.StartSpanConfig) {
if spanCtx, ok := ctx.(*spanContext); ok && spanCtx != nil {
cfg.SpanLinks = spanCtx.spanLinks
spanCtx.spanLinks = nil
if spanCtx, ok := ctx.(*spanContext); ok && spanCtx != nil && spanCtx.spanLinks != nil {
WithSpanLinks(spanCtx.spanLinks)(cfg)
spanCtx.spanLinks = nil //Extracted span links should not belong in the parent context of a span when it is started
}
cfg.Parent = ctx
}
}

Expand Down

0 comments on commit 0aec80a

Please sign in to comment.