Skip to content

Commit

Permalink
Zipkin-specific observer support
Browse files Browse the repository at this point in the history
  • Loading branch information
iamtakingiteasy committed Jun 28, 2020
1 parent 223664c commit b1c5a41
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 58 deletions.
56 changes: 56 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package zipkintracer

import (
"time"

"github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
)

// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect
// options used during zipkin.Span creation
type ZipkinStartSpanOptions struct {
// Parent span context reference, if any
Parent *model.SpanContext

// Span's start time
StartTime time.Time

// Kind clarifies context of timestamp, duration and remoteEndpoint in a span.
Kind model.Kind

// Tags used during span creation
Tags map[string]string

// RemoteEndpoint used during span creation
RemoteEndpoint *model.Endpoint
}

// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans
type ZipkinObserver interface {
// OnStartSpan is called when new Span is created. Creates and returns span observer.
// If the observer is not interested in the given span, it must return nil.
OnStartSpan(sp zipkin.Span, operationName string, options *ZipkinStartSpanOptions) ZipkinSpanObserver
}

// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about
// other Span events.
type ZipkinSpanObserver interface {
// Callback called from zipkin.Span.SetName()
OnSetName(operationName string)

// Callback called from zipkin.Span.SetTag()
OnSetTag(key, value string)

// Callback called from zipkin.Span.SetRemoteEndpoint()
OnSetRemoteEndpoint(remote *model.Endpoint)

// Callback called from zipkin.Span.Annotate()
OnAnnotate(t time.Time, annotation string)

// Callback called from zipkin.Span.Finish()
OnFinish()

// Callback called from zipkin.Span.FinishedWithDuration()
OnFinishedWithDuration(dur time.Duration)
}
111 changes: 85 additions & 26 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package zipkintracer

import (
"fmt"
"net"
"time"

otobserver "github.com/opentracing-contrib/go-observer"
Expand All @@ -25,23 +26,23 @@ import (
"github.com/openzipkin/zipkin-go"
)

// FinisherWithDuration allows to finish span with given duration
type FinisherWithDuration interface {
FinishedWithDuration(d time.Duration)
}

type spanImpl struct {
tracer *tracerImpl
zipkinSpan zipkin.Span
startTime time.Time
observer otobserver.SpanObserver
tracer *tracerImpl
zipkinSpan zipkin.Span
observer otobserver.SpanObserver
zipkinObserver ZipkinSpanObserver
options ZipkinStartSpanOptions
}

func (s *spanImpl) SetOperationName(operationName string) opentracing.Span {
if s.observer != nil {
s.observer.OnSetOperationName(operationName)
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetName(operationName)
}

s.zipkinSpan.SetName(operationName)
return s
}
Expand All @@ -51,23 +52,52 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
s.observer.OnSetTag(key, value)
}

if key == string(ext.SamplingPriority) {
endpointChanged := false

switch {
case key == string(ext.SamplingPriority):
// there are no means for now to change the sampling decision
// but when finishedSpanHandler is in place we could change this.
return s
case key == string(ext.SpanKind):
// this tag is translated into kind which can
// only be set on span creation
return s
case key == string(ext.PeerService):
serviceName, _ := value.(string)
s.options.RemoteEndpoint.ServiceName = serviceName
endpointChanged = true
case key == string(ext.PeerHostIPv4):
ipv4, _ := value.(string)
s.options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4)
endpointChanged = true
case key == string(ext.PeerHostIPv6):
ipv6, _ := value.(string)
s.options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6)
endpointChanged = true
case key == string(ext.PeerPort):
port, _ := value.(uint16)
s.options.RemoteEndpoint.Port = port
endpointChanged = true
}

if key == string(ext.SpanKind) ||
key == string(ext.PeerService) ||
key == string(ext.PeerHostIPv4) ||
key == string(ext.PeerHostIPv6) ||
key == string(ext.PeerPort) {
// this tags are translated into kind and remoteEndpoint which can
// only be set on span creation
if endpointChanged {
s.zipkinSpan.SetRemoteEndpoint(s.options.RemoteEndpoint)

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetRemoteEndpoint(s.options.RemoteEndpoint)
}

return s
}

s.zipkinSpan.Tag(key, fmt.Sprint(value))
strValue := fmt.Sprint(value)

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetTag(key, strValue)
}

s.zipkinSpan.Tag(key, strValue)
return s
}

Expand All @@ -78,7 +108,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
}

for _, field := range fields {
s.zipkinSpan.Annotate(time.Now(), field.String())
t := time.Now()
fieldValue := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, fieldValue)
}

s.zipkinSpan.Annotate(t, fieldValue)
}
}

Expand All @@ -88,7 +125,13 @@ func (s *spanImpl) LogFields(fields ...log.Field) {

func (s *spanImpl) logFields(t time.Time, fields ...log.Field) {
for _, field := range fields {
s.zipkinSpan.Annotate(t, field.String())
annotation := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, annotation)
}

s.zipkinSpan.Annotate(t, annotation)
}
}

Expand All @@ -110,14 +153,24 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload))
annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload)

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation)
}

s.zipkinSpan.Annotate(ld.Timestamp, annotation)
}

func (s *spanImpl) Finish() {
if s.observer != nil {
s.observer.OnFinish(opentracing.FinishOptions{})
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

Expand All @@ -131,15 +184,21 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
}

if !opts.FinishTime.IsZero() {
f, ok := s.zipkinSpan.(FinisherWithDuration)
if !ok {
return
dur := opts.FinishTime.Sub(s.options.StartTime)

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinishedWithDuration(dur)
}
f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime))

s.zipkinSpan.FinishedWithDuration(dur)
return
}

s.Finish()
if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

func (s *spanImpl) Tracer() opentracing.Tracer {
Expand Down
51 changes: 28 additions & 23 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,43 +58,47 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp

zopts := make([]zipkin.SpanOption, 0)

sp := &spanImpl{
tracer: t,
}

// Parent
if len(startSpanOptions.References) > 0 {
parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext)
if ok {
zopts = append(zopts, zipkin.Parent(model.SpanContext(parent)))
sp.options.Parent = (*model.SpanContext)(&parent)
}
}

startTime := time.Now()
// Time
sp.options.StartTime = time.Now()
if !startSpanOptions.StartTime.IsZero() {
zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime))
startTime = startSpanOptions.StartTime
sp.options.StartTime = startSpanOptions.StartTime
zopts = append(zopts, zipkin.StartTime(sp.options.StartTime))
}

zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...)
zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &sp.options)...)

newSpan := t.zipkinTracer.StartSpan(operationName, zopts...)
sp.zipkinSpan = t.zipkinTracer.StartSpan(operationName, zopts...)

sp := &spanImpl{
zipkinSpan: newSpan,
tracer: t,
startTime: startTime,
}
if t.opts.observer != nil {
observer, _ := t.opts.observer.OnStartSpan(sp, operationName, startSpanOptions)
sp.observer = observer
}

if t.opts.zipkinObserver != nil {
sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, &sp.options)
}

return sp
}

func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption {
zopts := make([]zipkin.SpanOption, 0)

tags := map[string]string{}
remoteEndpoint := &model.Endpoint{}
options.Tags = map[string]string{}
options.RemoteEndpoint = &model.Endpoint{}

var kind string
if val, ok := t[string(ext.SpanKind)]; ok {
Expand All @@ -112,29 +116,30 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
mKind == model.Producer ||
mKind == model.Consumer {
zopts = append(zopts, zipkin.Kind(mKind))
options.Kind = mKind
} else {
tags["span.kind"] = kind
options.Tags["span.kind"] = kind
}
}

if val, ok := t[string(ext.PeerService)]; ok {
serviceName, _ := val.(string)
remoteEndpoint.ServiceName = serviceName
options.RemoteEndpoint.ServiceName = serviceName
}

if val, ok := t[string(ext.PeerHostIPv4)]; ok {
ipv4, _ := val.(string)
remoteEndpoint.IPv4 = net.ParseIP(ipv4)
options.RemoteEndpoint.IPv4 = net.ParseIP(ipv4)
}

if val, ok := t[string(ext.PeerHostIPv6)]; ok {
ipv6, _ := val.(string)
remoteEndpoint.IPv6 = net.ParseIP(ipv6)
options.RemoteEndpoint.IPv6 = net.ParseIP(ipv6)
}

if val, ok := t[string(ext.PeerPort)]; ok {
port, _ := val.(uint16)
remoteEndpoint.Port = port
options.RemoteEndpoint.Port = port
}

for key, val := range t {
Expand All @@ -146,15 +151,15 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
continue
}

tags[key] = fmt.Sprint(val)
options.Tags[key] = fmt.Sprint(val)
}

if len(tags) > 0 {
zopts = append(zopts, zipkin.Tags(tags))
if len(options.Tags) > 0 {
zopts = append(zopts, zipkin.Tags(options.Tags))
}

if !remoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint))
if !options.RemoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(options.RemoteEndpoint))
}

return zopts
Expand Down
12 changes: 10 additions & 2 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (

// TracerOptions allows creating a customized Tracer.
type TracerOptions struct {
observer otobserver.Observer
b3InjectOpt B3InjectOption
observer otobserver.Observer
b3InjectOpt B3InjectOption
zipkinObserver ZipkinObserver
}

// TracerOption allows for functional options.
Expand All @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption {
}
}

// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver
func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption {
return func(opts *TracerOptions) {
opts.zipkinObserver = zipkinObserver
}
}

// WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier
func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption {
return func(opts *TracerOptions) {
Expand Down
Loading

0 comments on commit b1c5a41

Please sign in to comment.