From 63c96398a463ded61c36df30cb3ae7af67795eea Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 13 Feb 2017 23:06:59 +0530 Subject: [PATCH 1/5] Monitoring platform events with zipkin-go/opentracing Perf events can provide additional insights into the performance of a span or a sequence of spans in a trace. This patch includes the use of a library called "perfevents" (https://github.com/opentracing-contrib/perfevents) which can be simply imported and used. This library is an abstraction of the platform side of counters and operations. Right now, only the generic perf events are supported. With this patch, one should be able to find out the cpu-cycles, instructions, cache-references, cache-misses, branch-instructions, branch-misses and bus cycles for a span. The required event(s) should be supplied in the form of a string(comma separated list if there are multiple events to be monitored) to the StartSpan call and then, the opentracing-go library and the zipkin-go-opentracing library should take over from there. As soon as the span is closed, the perf event(s) count is logged with span.Log(). An example to use this with a span : // start a span with perfevents sp = tracer.StartSpan("read_file", opentracing.PerfString("cache-misses,cpu-cycles")) Upon closing the span, the events stop monitoring and are destroyed afer the data are sent for logging. Signed-off-by: Hemant Kumar --- span.go | 15 +++++++++++++++ tracer.go | 8 ++++++++ 2 files changed, 23 insertions(+) diff --git a/span.go b/span.go index 7fa78a9..9f6d006 100644 --- a/span.go +++ b/span.go @@ -9,6 +9,7 @@ import ( "github.com/opentracing/opentracing-go/log" "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" + "github.com/opentracing-contrib/perfevents/go" ) // Span provides access to the essential details of the span, for use @@ -34,6 +35,8 @@ type spanImpl struct { // The number of logs dropped because of MaxLogsPerSpan. numDroppedLogs int Endpoint *zipkincore.Endpoint + // perfevent desc (contains information for perf events created) + EventDescs []perfevents.PerfEventInfo } var spanPool = &sync.Pool{New: func() interface{} { @@ -184,6 +187,18 @@ func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { } func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { + // log and close the perf events firs, if any, since, we don't want + // to account for the code to finish up the span. + perfevents.EventsRead(s.EventDescs) + for _,event := range s.EventDescs { + // In case of an error for an event, event.EventName will + // contain "" for an event. + if event.EventName != "" { + s.LogEvent(event.EventName + ": " + perfevents.FormatDataToString(event)) + } + } + perfevents.EventsDisableClose(s.EventDescs) + finishTime := opts.FinishTime if finishTime.IsZero() { finishTime = time.Now() diff --git a/tracer.go b/tracer.go index e9c4f0c..61fd9bb 100644 --- a/tracer.go +++ b/tracer.go @@ -8,6 +8,7 @@ import ( "github.com/opentracing/opentracing-go/ext" "github.com/openzipkin/zipkin-go-opentracing/flag" + "github.com/opentracing-contrib/perfevents/go" ) // ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host @@ -283,9 +284,16 @@ func (t *tracerImpl) startSpanWithOptions( // Tags. tags := opts.Tags + // Perfevents + perfevent := opts.Perfevent + // Build the new span. This is the only allocation: We'll return this as // an opentracing.Span. sp := t.getSpan() + + // start monitoring for the perf event(s), if any + _, _, sp.EventDescs = perfevents.InitOpenEventsEnableSelf(perfevent) + // Look for a parent in the list of References. // // TODO: would be nice if basictracer did something with all From ddf037c0f84994b6e01469c06dc2e504b4943041 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 20 Feb 2017 23:38:45 +0530 Subject: [PATCH 2/5] Adjust to the new spec for perfevents Use PerfEvent tag to check if perfevents needs to be initialized, if yes, initialize them. Signed-off-by: Hemant Kumar --- span.go | 7 +++++++ tracer.go | 13 ++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/span.go b/span.go index 9f6d006..32ae0ff 100644 --- a/span.go +++ b/span.go @@ -86,6 +86,13 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { return s } } + + if key == string(ext.PerfEvent) { + if v, ok := value.(string); ok { + _, _, s.EventDescs = perfevents.InitOpenEventsEnableSelf(v) + } + } + if s.trim() { return s } diff --git a/tracer.go b/tracer.go index 61fd9bb..3aa0cac 100644 --- a/tracer.go +++ b/tracer.go @@ -8,7 +8,6 @@ import ( "github.com/opentracing/opentracing-go/ext" "github.com/openzipkin/zipkin-go-opentracing/flag" - "github.com/opentracing-contrib/perfevents/go" ) // ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host @@ -284,16 +283,10 @@ func (t *tracerImpl) startSpanWithOptions( // Tags. tags := opts.Tags - // Perfevents - perfevent := opts.Perfevent - // Build the new span. This is the only allocation: We'll return this as // an opentracing.Span. sp := t.getSpan() - // start monitoring for the perf event(s), if any - _, _, sp.EventDescs = perfevents.InitOpenEventsEnableSelf(perfevent) - // Look for a parent in the list of References. // // TODO: would be nice if basictracer did something with all @@ -384,6 +377,12 @@ func (t *tracerImpl) startSpanInternal( sp.raw.Start = startTime sp.raw.Duration = -1 sp.raw.Tags = tags + for k,v := range tags { + if k == string(ext.PerfEvent) { + sp.SetTag(k, v) + } + } + if t.options.debugAssertSingleGoroutine { sp.SetTag(debugGoroutineIDTag, curGoroutineID()) } From 4e3a6d3c24516e614740c9cd216c0bff977a8851 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Tue, 21 Mar 2017 12:42:57 +0530 Subject: [PATCH 3/5] Add an Observer API and its implementation Metrics are useful to gain insights in a distributed application. But there can be a lot of metrics in different domains. Adding such metrics from one domain (metrics exporter pkg) into zipkin is not good for cross platform compatibility. This commit adds a new observer and span observer API which defines a standard interface to add such metrics. To expose the metrics, one would have to implement the observer interface in the metrics exporter. zipkin will need to implement this interface and install callbacks for methods such as StartSpan, SetOperationName, SetTag and Finish. The registered callbacks would then be called on the span events if an observer is created. This is based on the work done by Yuri here : https://github.com/uber/jaeger-client-go/pull/94 Signed-off-by: Yuri Shkuro Signed-off-by: Hemant Kumar --- observer.go | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++ span.go | 34 ++++++++------------ tracer.go | 19 +++++++++--- 3 files changed, 115 insertions(+), 27 deletions(-) create mode 100644 observer.go diff --git a/observer.go b/observer.go new file mode 100644 index 0000000..29e66df --- /dev/null +++ b/observer.go @@ -0,0 +1,89 @@ +package zipkintracer + +import ( + opentracing "github.com/opentracing/opentracing-go" +) + +// Observer can be registered with the zipkin to recieve notifications +// about new Spans. +// The actual registration depends on the implementation, which might look +// like the below e.g : +// observer := myobserver.NewObserver() +// tracer := zipkin.NewTracer(..., zipkin.WithObserver(observer)) +// +type Observer interface { + // Create and return a span observer. Called when a span starts. + // E.g : + // func StartSpan(opName string, opts ...opentracing.StartSpanOption) { + // var sp opentracing.Span + // sso := opentracing.StartSpanOptions{} + // var spObs opentracing.SpanObserver = observer.OnStartSpan(span, opName, sso) + // ... + // } + // OnStartSpan function needs to be defined for a package exporting + // metrics as well. + OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) SpanObserver +} + +// SpanObserver is created by the Observer and receives notifications about +// other Span events. +// zipkin should define these functions for each of the span operations +// which should call the registered (observer) callbacks. +type SpanObserver interface { + // Callback called from opentracing.Span.SetOperationName() + OnSetOperationName(operationName string) + // Callback called from opentracing.Span.SetTag() + OnSetTag(key string, value interface{}) + // Callback called from opentracing.Span.Finish() + OnFinish(options opentracing.FinishOptions) +} + +// observer is a dispatcher to other observers +type observer struct { + observers []Observer +} + +// spanObserver is a dispatcher to other span observers +type spanObserver struct { + observers []SpanObserver +} + +// noopSpanObserver is used when there are no observers registered on the +// Tracer or none of them returns span observers +var noopSpanObserver = spanObserver{} + +func (o observer) OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) SpanObserver { + var spanObservers []SpanObserver + for _, obs := range o.observers { + spanObs := obs.OnStartSpan(sp, operationName, options) + if spanObs != nil { + if spanObservers == nil { + spanObservers = make([]SpanObserver, 0, len(o.observers)) + } + spanObservers = append(spanObservers, spanObs) + } + } + if len(spanObservers) == 0 { + return noopSpanObserver + } + + return spanObserver{observers: spanObservers} +} + +func (o spanObserver) OnSetOperationName(operationName string) { + for _, obs := range o.observers { + obs.OnSetOperationName(operationName) + } +} + +func (o spanObserver) OnSetTag(key string, value interface{}) { + for _, obs := range o.observers { + obs.OnSetTag(key, value) + } +} + +func (o spanObserver) OnFinish(options opentracing.FinishOptions) { + for _, obs := range o.observers { + obs.OnFinish(options) + } +} diff --git a/span.go b/span.go index 32ae0ff..ffa4cbe 100644 --- a/span.go +++ b/span.go @@ -9,7 +9,6 @@ import ( "github.com/opentracing/opentracing-go/log" "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" - "github.com/opentracing-contrib/perfevents/go" ) // Span provides access to the essential details of the span, for use @@ -30,13 +29,12 @@ type Span interface { type spanImpl struct { tracer *tracerImpl event func(SpanEvent) + Observer SpanObserver sync.Mutex // protects the fields below raw RawSpan // The number of logs dropped because of MaxLogsPerSpan. numDroppedLogs int Endpoint *zipkincore.Endpoint - // perfevent desc (contains information for perf events created) - EventDescs []perfevents.PerfEventInfo } var spanPool = &sync.Pool{New: func() interface{} { @@ -66,6 +64,9 @@ func (s *spanImpl) reset() { } func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { + if s.Observer != nil { + s.Observer.OnSetOperationName(operationName) + } s.Lock() defer s.Unlock() s.raw.Operation = operationName @@ -78,6 +79,10 @@ func (s *spanImpl) trim() bool { func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { defer s.onTag(key, value) + if s.Observer != nil { + s.Observer.OnSetTag(key, value) + } + s.Lock() defer s.Unlock() if key == string(ext.SamplingPriority) { @@ -86,13 +91,6 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { return s } } - - if key == string(ext.PerfEvent) { - if v, ok := value.(string); ok { - _, _, s.EventDescs = perfevents.InitOpenEventsEnableSelf(v) - } - } - if s.trim() { return s } @@ -194,24 +192,16 @@ func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { } func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { - // log and close the perf events firs, if any, since, we don't want - // to account for the code to finish up the span. - perfevents.EventsRead(s.EventDescs) - for _,event := range s.EventDescs { - // In case of an error for an event, event.EventName will - // contain "" for an event. - if event.EventName != "" { - s.LogEvent(event.EventName + ": " + perfevents.FormatDataToString(event)) - } - } - perfevents.EventsDisableClose(s.EventDescs) - finishTime := opts.FinishTime if finishTime.IsZero() { finishTime = time.Now() } duration := finishTime.Sub(s.raw.Start) + if s.Observer != nil { + s.Observer.OnFinish(opts) + } + s.Lock() defer s.Unlock() diff --git a/tracer.go b/tracer.go index 3aa0cac..9f1a6fa 100644 --- a/tracer.go +++ b/tracer.go @@ -107,6 +107,8 @@ type TracerOptions struct { // Regardless of this setting, the library will propagate and support both // 64 and 128 bit incoming traces from upstream sources. traceID128Bit bool + + observer Observer } // TracerOption allows for functional options. @@ -228,6 +230,7 @@ func NewTracer(recorder SpanRecorder, options ...TracerOption) (opentracing.Trac debugMode: false, traceID128Bit: false, maxLogsPerSpan: 10000, + observer: nil, } for _, o := range options { err := o(opts) @@ -287,6 +290,10 @@ func (t *tracerImpl) startSpanWithOptions( // an opentracing.Span. sp := t.getSpan() + if t.options.observer != nil { + sp.Observer = t.options.observer.OnStartSpan(sp, operationName, opts) + } + // Look for a parent in the list of References. // // TODO: would be nice if basictracer did something with all @@ -377,11 +384,6 @@ func (t *tracerImpl) startSpanInternal( sp.raw.Start = startTime sp.raw.Duration = -1 sp.raw.Tags = tags - for k,v := range tags { - if k == string(ext.PerfEvent) { - sp.SetTag(k, v) - } - } if t.options.debugAssertSingleGoroutine { sp.SetTag(debugGoroutineIDTag, curGoroutineID()) @@ -424,3 +426,10 @@ func (t *tracerImpl) Extract(format interface{}, carrier interface{}) (opentraci func (t *tracerImpl) Options() TracerOptions { return t.options } + +func WithObserver(observer Observer) TracerOption { + return func(opts *TracerOptions) error { + opts.observer = observer + return nil + } +} From e3d109a12eab19180a5bdb123ade80e25a8d313b Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Fri, 31 Mar 2017 12:42:28 +0530 Subject: [PATCH 4/5] Add a comment for the exported func WithObserver --- tracer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tracer.go b/tracer.go index 9f1a6fa..46d9eaa 100644 --- a/tracer.go +++ b/tracer.go @@ -427,6 +427,7 @@ func (t *tracerImpl) Options() TracerOptions { return t.options } +// WithObserver assigns an initialized observer to opts.observer func WithObserver(observer Observer) TracerOption { return func(opts *TracerOptions) error { opts.observer = observer From 7552a47402712b4d62c253d047d06030678798b5 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 22 Jun 2017 21:16:02 +0530 Subject: [PATCH 5/5] Use the go-observer interface --- observer.go | 57 ++++++++++------------------------------------------- span.go | 15 +++++++------- tracer.go | 7 ++++--- 3 files changed, 22 insertions(+), 57 deletions(-) diff --git a/observer.go b/observer.go index 29e66df..d0b5da6 100644 --- a/observer.go +++ b/observer.go @@ -2,72 +2,35 @@ package zipkintracer import ( opentracing "github.com/opentracing/opentracing-go" + otobserver "github.com/opentracing-contrib/go-observer" ) -// Observer can be registered with the zipkin to recieve notifications -// about new Spans. -// The actual registration depends on the implementation, which might look -// like the below e.g : -// observer := myobserver.NewObserver() -// tracer := zipkin.NewTracer(..., zipkin.WithObserver(observer)) -// -type Observer interface { - // Create and return a span observer. Called when a span starts. - // E.g : - // func StartSpan(opName string, opts ...opentracing.StartSpanOption) { - // var sp opentracing.Span - // sso := opentracing.StartSpanOptions{} - // var spObs opentracing.SpanObserver = observer.OnStartSpan(span, opName, sso) - // ... - // } - // OnStartSpan function needs to be defined for a package exporting - // metrics as well. - OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) SpanObserver -} - -// SpanObserver is created by the Observer and receives notifications about -// other Span events. -// zipkin should define these functions for each of the span operations -// which should call the registered (observer) callbacks. -type SpanObserver interface { - // Callback called from opentracing.Span.SetOperationName() - OnSetOperationName(operationName string) - // Callback called from opentracing.Span.SetTag() - OnSetTag(key string, value interface{}) - // Callback called from opentracing.Span.Finish() - OnFinish(options opentracing.FinishOptions) -} - // observer is a dispatcher to other observers type observer struct { - observers []Observer + observers []otobserver.Observer } // spanObserver is a dispatcher to other span observers type spanObserver struct { - observers []SpanObserver + observers []otobserver.SpanObserver } -// noopSpanObserver is used when there are no observers registered on the -// Tracer or none of them returns span observers -var noopSpanObserver = spanObserver{} - -func (o observer) OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) SpanObserver { - var spanObservers []SpanObserver +func (o observer) OnStartSpan(sp opentracing.Span, operationName string, options opentracing.StartSpanOptions) (otobserver.SpanObserver, bool) { + var spanObservers []otobserver.SpanObserver for _, obs := range o.observers { - spanObs := obs.OnStartSpan(sp, operationName, options) - if spanObs != nil { + spanObs, ok := obs.OnStartSpan(sp, operationName, options) + if ok { if spanObservers == nil { - spanObservers = make([]SpanObserver, 0, len(o.observers)) + spanObservers = make([]otobserver.SpanObserver, 0, len(o.observers)) } spanObservers = append(spanObservers, spanObs) } } if len(spanObservers) == 0 { - return noopSpanObserver + return nil, false } - return spanObserver{observers: spanObservers} + return spanObserver{observers: spanObservers}, true } func (o spanObserver) OnSetOperationName(operationName string) { diff --git a/span.go b/span.go index ffa4cbe..0f5a669 100644 --- a/span.go +++ b/span.go @@ -9,6 +9,7 @@ import ( "github.com/opentracing/opentracing-go/log" "github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore" + otobserver "github.com/opentracing-contrib/go-observer" ) // Span provides access to the essential details of the span, for use @@ -29,7 +30,7 @@ type Span interface { type spanImpl struct { tracer *tracerImpl event func(SpanEvent) - Observer SpanObserver + observer otobserver.SpanObserver sync.Mutex // protects the fields below raw RawSpan // The number of logs dropped because of MaxLogsPerSpan. @@ -64,8 +65,8 @@ func (s *spanImpl) reset() { } func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { - if s.Observer != nil { - s.Observer.OnSetOperationName(operationName) + if s.observer != nil { + s.observer.OnSetOperationName(operationName) } s.Lock() defer s.Unlock() @@ -79,8 +80,8 @@ func (s *spanImpl) trim() bool { func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { defer s.onTag(key, value) - if s.Observer != nil { - s.Observer.OnSetTag(key, value) + if s.observer != nil { + s.observer.OnSetTag(key, value) } s.Lock() @@ -198,8 +199,8 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { } duration := finishTime.Sub(s.raw.Start) - if s.Observer != nil { - s.Observer.OnFinish(opts) + if s.observer != nil { + s.observer.OnFinish(opts) } s.Lock() diff --git a/tracer.go b/tracer.go index 46d9eaa..1e4393d 100644 --- a/tracer.go +++ b/tracer.go @@ -8,6 +8,7 @@ import ( "github.com/opentracing/opentracing-go/ext" "github.com/openzipkin/zipkin-go-opentracing/flag" + otobserver "github.com/opentracing-contrib/go-observer" ) // ErrInvalidEndpoint will be thrown if hostPort parameter is corrupted or host @@ -108,7 +109,7 @@ type TracerOptions struct { // 64 and 128 bit incoming traces from upstream sources. traceID128Bit bool - observer Observer + observer otobserver.Observer } // TracerOption allows for functional options. @@ -291,7 +292,7 @@ func (t *tracerImpl) startSpanWithOptions( sp := t.getSpan() if t.options.observer != nil { - sp.Observer = t.options.observer.OnStartSpan(sp, operationName, opts) + sp.observer, _ = t.options.observer.OnStartSpan(sp, operationName, opts) } // Look for a parent in the list of References. @@ -428,7 +429,7 @@ func (t *tracerImpl) Options() TracerOptions { } // WithObserver assigns an initialized observer to opts.observer -func WithObserver(observer Observer) TracerOption { +func WithObserver(observer otobserver.Observer) TracerOption { return func(opts *TracerOptions) error { opts.observer = observer return nil