From 69951dcf0de51fd7fbd1fea7c1725e1b12468ee1 Mon Sep 17 00:00:00 2001 From: Alexander Tumin Date: Sat, 27 Jun 2020 22:46:50 +0300 Subject: [PATCH] Zipkin-specific observer support closes #156 --- observer.go | 53 +++++++++++++++++++++++++++++ span.go | 85 +++++++++++++++++++++++++++++++++++++---------- tracer.go | 41 ++++++++++++++--------- tracer_options.go | 12 +++++-- tracer_test.go | 54 ++++++++++++++++++++++++++---- 5 files changed, 202 insertions(+), 43 deletions(-) create mode 100644 observer.go diff --git a/observer.go b/observer.go new file mode 100644 index 0000000..13cdffa --- /dev/null +++ b/observer.go @@ -0,0 +1,53 @@ +package zipkintracer + +import ( + "time" + + "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" +) + +// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect +// options used during zipkin.Span creation +type ZipkinStartSpanOptions struct { + // Parent span context reference, if any + Parent *model.SpanContext + + // Span's start time + StartTime time.Time + + // Kind clarifies context of timestamp, duration and remoteEndpoint in a span. + Kind model.Kind + + // Tags used during span creation + Tags map[string]string + + // RemoteEndpoint used during span creation + RemoteEndpoint *model.Endpoint +} + +// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans +type ZipkinObserver interface { + // OnStartSpan is called when new Span is created. Creates and returns span observer. + // If the observer is not interested in the given span, it must return nil. + OnStartSpan(sp zipkin.Span, operationName string, options *ZipkinStartSpanOptions) ZipkinSpanObserver +} + +// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about +// other Span events. +type ZipkinSpanObserver interface { + // Callback called from zipkin.Span.SetName() + OnSetName(operationName string) + + // Callback called from zipkin.Span.SetTag() + OnSetTag(key, value string) + + // Callback called from zipkin.Span.Annotate() + OnAnnotate(t time.Time, annotation string) + + // Callback called from zipkin.Span.Finish() + OnFinish() + + // Callback called from zipkin.Span.FinishedWithDuration() + OnFinishedWithDuration(dur time.Duration) +} diff --git a/span.go b/span.go index fe7915d..97e2565 100644 --- a/span.go +++ b/span.go @@ -31,10 +31,11 @@ type FinisherWithDuration interface { } type spanImpl struct { - tracer *tracerImpl - zipkinSpan zipkin.Span - startTime time.Time - observer otobserver.SpanObserver + tracer *tracerImpl + zipkinSpan zipkin.Span + startTime time.Time + observer otobserver.SpanObserver + zipkinObserver ZipkinSpanObserver } func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { @@ -42,15 +43,15 @@ func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { s.observer.OnSetOperationName(operationName) } + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetName(operationName) + } + s.zipkinSpan.SetName(operationName) return s } func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { - if s.observer != nil { - s.observer.OnSetTag(key, value) - } - if key == string(ext.SamplingPriority) { // there are no means for now to change the sampling decision // but when finishedSpanHandler is in place we could change this. @@ -67,7 +68,17 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { return s } - s.zipkinSpan.Tag(key, fmt.Sprint(value)) + strValue := fmt.Sprint(value) + + if s.observer != nil { + s.observer.OnSetTag(key, value) + } + + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetTag(key, strValue) + } + + s.zipkinSpan.Tag(key, strValue) return s } @@ -78,7 +89,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) { } for _, field := range fields { - s.zipkinSpan.Annotate(time.Now(), field.String()) + t := time.Now() + fieldValue := field.String() + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(t, fieldValue) + } + + s.zipkinSpan.Annotate(t, fieldValue) } } @@ -88,7 +106,13 @@ func (s *spanImpl) LogFields(fields ...log.Field) { func (s *spanImpl) logFields(t time.Time, fields ...log.Field) { for _, field := range fields { - s.zipkinSpan.Annotate(t, field.String()) + annotation := field.String() + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(t, annotation) + } + + s.zipkinSpan.Annotate(t, annotation) } } @@ -110,7 +134,13 @@ func (s *spanImpl) Log(ld opentracing.LogData) { ld.Timestamp = time.Now() } - s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload)) + annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload) + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation) + } + + s.zipkinSpan.Annotate(ld.Timestamp, annotation) } func (s *spanImpl) Finish() { @@ -118,14 +148,14 @@ func (s *spanImpl) Finish() { s.observer.OnFinish(opentracing.FinishOptions{}) } + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinish() + } + s.zipkinSpan.Finish() } func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { - if s.observer != nil { - s.observer.OnFinish(opts) - } - for _, lr := range opts.LogRecords { s.logFields(lr.Timestamp, lr.Fields...) } @@ -135,11 +165,30 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { if !ok { return } - f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime)) + + dur := opts.FinishTime.Sub(s.startTime) + + if s.observer != nil { + s.observer.OnFinish(opts) + } + + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinishedWithDuration(dur) + } + + f.FinishedWithDuration(dur) return } - s.Finish() + if s.observer != nil { + s.observer.OnFinish(opts) + } + + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinish() + } + + s.zipkinSpan.Finish() } func (s *spanImpl) Tracer() opentracing.Tracer { diff --git a/tracer.go b/tracer.go index b48d92a..9f9da7d 100644 --- a/tracer.go +++ b/tracer.go @@ -58,22 +58,25 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp zopts := make([]zipkin.SpanOption, 0) + var zipkinStartSpanOptions ZipkinStartSpanOptions + // Parent if len(startSpanOptions.References) > 0 { parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext) if ok { zopts = append(zopts, zipkin.Parent(model.SpanContext(parent))) + zipkinStartSpanOptions.Parent = (*model.SpanContext)(&parent) } } startTime := time.Now() // Time if !startSpanOptions.StartTime.IsZero() { - zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime)) - startTime = startSpanOptions.StartTime + zipkinStartSpanOptions.StartTime = startSpanOptions.StartTime + zopts = append(zopts, zipkin.StartTime(zipkinStartSpanOptions.StartTime)) } - zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...) + zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &zipkinStartSpanOptions)...) newSpan := t.zipkinTracer.StartSpan(operationName, zopts...) @@ -82,19 +85,24 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp tracer: t, startTime: startTime, } + if t.opts.observer != nil { observer, _ := t.opts.observer.OnStartSpan(sp, operationName, startSpanOptions) sp.observer = observer } + if t.opts.zipkinObserver != nil { + sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, &zipkinStartSpanOptions) + } + return sp } -func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { +func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption { zopts := make([]zipkin.SpanOption, 0) - tags := map[string]string{} - remoteEndpoint := &model.Endpoint{} + options.Tags = map[string]string{} + options.RemoteEndpoint = &model.Endpoint{} var kind string if val, ok := t[string(ext.SpanKind)]; ok { @@ -112,29 +120,30 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { mKind == model.Producer || mKind == model.Consumer { zopts = append(zopts, zipkin.Kind(mKind)) + options.Kind = mKind } else { - tags["span.kind"] = kind + options.Tags["span.kind"] = kind } } if val, ok := t[string(ext.PeerService)]; ok { serviceName, _ := val.(string) - remoteEndpoint.ServiceName = serviceName + options.RemoteEndpoint.ServiceName = serviceName } if val, ok := t[string(ext.PeerHostIPv4)]; ok { ipv4, _ := val.(string) - remoteEndpoint.IPv4 = net.ParseIP(ipv4) + options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4) } if val, ok := t[string(ext.PeerHostIPv6)]; ok { ipv6, _ := val.(string) - remoteEndpoint.IPv6 = net.ParseIP(ipv6) + options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6) } if val, ok := t[string(ext.PeerPort)]; ok { port, _ := val.(uint16) - remoteEndpoint.Port = port + options.RemoteEndpoint.Port = port } for key, val := range t { @@ -146,15 +155,15 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { continue } - tags[key] = fmt.Sprint(val) + options.Tags[key] = fmt.Sprint(val) } - if len(tags) > 0 { - zopts = append(zopts, zipkin.Tags(tags)) + if len(options.Tags) > 0 { + zopts = append(zopts, zipkin.Tags(options.Tags)) } - if !remoteEndpoint.Empty() { - zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint)) + if !options.RemoteEndpoint.Empty() { + zopts = append(zopts, zipkin.RemoteEndpoint(options.RemoteEndpoint)) } return zopts diff --git a/tracer_options.go b/tracer_options.go index 190a692..ae2e312 100644 --- a/tracer_options.go +++ b/tracer_options.go @@ -31,8 +31,9 @@ const ( // TracerOptions allows creating a customized Tracer. type TracerOptions struct { - observer otobserver.Observer - b3InjectOpt B3InjectOption + observer otobserver.Observer + b3InjectOpt B3InjectOption + zipkinObserver ZipkinObserver } // TracerOption allows for functional options. @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption { } } +// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver +func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption { + return func(opts *TracerOptions) { + opts.zipkinObserver = zipkinObserver + } +} + // WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption { return func(opts *TracerOptions) { diff --git a/tracer_test.go b/tracer_test.go index 0c991d0..00aac43 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -38,7 +38,9 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) { {"span.kind": ext.SpanKindRPCServerEnum}, } for _, tags := range tagCases { - opts := parseTagsAsZipkinOptions(tags) + var zipkinStartSpanOptions ZipkinStartSpanOptions + + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -52,12 +54,18 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) { if want, have := model.Server, spans[0].Kind; want != have { t.Errorf("unexpected kind value, want %s, have %s", want, have) } + + if want, have := model.Server, zipkinStartSpanOptions.Kind; want != have { + t.Errorf("unexpected start options kind value, want %s, have %s", want, have) + } } } func TestOTKindTagIsCantBeParsed(t *testing.T) { + var zipkinStartSpanOptions ZipkinStartSpanOptions + tags := map[string]interface{}{"span.kind": "banana"} - opts := parseTagsAsZipkinOptions(tags) + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -75,13 +83,29 @@ func TestOTKindTagIsCantBeParsed(t *testing.T) { if want, have := "banana", spans[0].Tags["span.kind"]; want != have { t.Errorf("unexpected tag value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %v", zipkinStartSpanOptions.Tags) + } + + if want, have := "banana", zipkinStartSpanOptions.Tags["span.kind"]; want != have { + t.Errorf("unexpected start options tags[span.kind] value, want %s, have %s", want, have) + } } func TestOptionsFromOTTags(t *testing.T) { + var zipkinStartSpanOptions ZipkinStartSpanOptions + + const ( + sServiceA = "service_a" + sValue = "value" + sKey = "key" + ) + tags := map[string]interface{}{} - tags[string(ext.PeerService)] = "service_a" - tags["key"] = "value" - opts := parseTagsAsZipkinOptions(tags) + tags[string(ext.PeerService)] = sServiceA + tags[sKey] = sValue + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -92,11 +116,27 @@ func TestOptionsFromOTTags(t *testing.T) { t.Fatalf("unexpected number of spans, want %d, have %d", want, have) } - if want, have := "service_a", spans[0].RemoteEndpoint.ServiceName; want != have { + if want, have := sServiceA, spans[0].RemoteEndpoint.ServiceName; want != have { t.Errorf("unexpected remote service name, want %s, have %s", want, have) } - if want, have := "value", spans[0].Tags["key"]; want != have { + if want, have := sValue, spans[0].Tags[sKey]; want != have { t.Errorf("unexpected tag value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %s", zipkinStartSpanOptions.Tags) + } + + if want, have := sValue, zipkinStartSpanOptions.Tags[sKey]; want != have { + t.Errorf("unexpected start options tags[key] value, want %s, have %s", want, have) + } + + if zipkinStartSpanOptions.RemoteEndpoint == nil { + t.Errorf("unexpected start options remote endpoint value, want non-nil instance, have %v", zipkinStartSpanOptions.RemoteEndpoint) + } + + if want, have := sServiceA, zipkinStartSpanOptions.RemoteEndpoint.ServiceName; want != have { + t.Errorf("unexpected start options remote service name, want %s, have %s", want, have) + } }