diff --git a/Gopkg.lock b/Gopkg.lock index ec054c6e..29fb3e22 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -51,6 +51,16 @@ revision = "1949ddbfd147afd4d964a9f00b24eb291e0e7c38" version = "v1.0.2" +[[projects]] + name = "github.com/openzipkin/zipkin-go" + packages = [ + "model", + "reporter", + "reporter/http" + ] + revision = "f197ec29e729f226d23370ea60f0e49b8f44ccf4" + version = "v0.1.0" + [[projects]] name = "github.com/pkg/errors" packages = ["."] @@ -159,6 +169,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "f9dcfaf37a785c5dac1e20c29605eda29a83ba9c6f8842e92960dc94c8c4ff80" + inputs-digest = "7ac0a85916ecff7ebae7dcacaa4e6865cbfaa38dcad664e48ad41d66a12b64f7" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index baf7a6bd..d2719cac 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -25,3 +25,7 @@ [[constraint]] name = "go.uber.org/zap" version = "^1" + +[[constraint]] + name = "github.com/openzipkin/zipkin-go" + version = "0.1.0" diff --git a/README.md b/README.md index 16b04454..8ca04de6 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,8 @@ The remote reporter uses "transports" to actually send the spans out of process. Currently the supported transports include: * [Jaeger Thrift](https://github.com/jaegertracing/jaeger-idl/blob/master/thrift/agent.thrift) over UDP or HTTP, * [Zipkin Thrift](https://github.com/jaegertracing/jaeger-idl/blob/master/thrift/zipkincore.thrift) over HTTP. + * [Zipkin v2 Reporter](./zipkin/reporter.go) that can send spans using any + Zipkin Reporter instance, which support a variety of transports ### Sampling diff --git a/glide.lock b/glide.lock index d76b1536..d563c41a 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 3accf84f97bff4a91162736104c0e9b9790820712bd86db6fec5e665f7196a82 -updated: 2018-04-30T11:46:43.804556-04:00 +hash: a24740e350f0260f56ed77cf333a70ec33c7fd05df4732f152532137f189e72a +updated: 2018-05-14T14:13:47.874224-04:00 imports: - name: github.com/beorn7/perks version: 3a771d992973f24aa725d07868b467d1ddfceafb @@ -29,6 +29,12 @@ imports: subpackages: - ext - log +- name: github.com/openzipkin/zipkin-go + version: f197ec29e729f226d23370ea60f0e49b8f44ccf4 + subpackages: + - model + - reporter + - reporter/http - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d - name: github.com/pmezard/go-difflib @@ -62,17 +68,17 @@ imports: - require - suite - name: github.com/uber/jaeger-lib - version: 4267858c0679cd4e47cefed8d7f70fd386cfb567 + version: ed3a127ec5fef7ae9ea95b01b542c47fbd999ce5 subpackages: - metrics - metrics/prometheus - metrics/testutils - name: go.uber.org/atomic - version: 8474b86a5a6f79c443ce4b2992817ff32cf208b8 + version: 1ea20fb1cbb1cc08cbd0d913a96dead89aa18289 - name: go.uber.org/multierr version: 3c4937480c32f4c13a875a1829af76c98ca3d40a - name: go.uber.org/zap - version: eeedf312bc6c57391d84767a4cd413f02a917974 + version: f85c78b1dd998214c5f2138155b320a4a43fbe36 subpackages: - buffer - internal/bufferpool diff --git a/glide.yaml b/glide.yaml index 6637da21..c10d23bd 100644 --- a/glide.yaml +++ b/glide.yaml @@ -12,6 +12,11 @@ import: - metrics - package: github.com/pkg/errors version: ~0.8.0 +- package: github.com/openzipkin/zipkin-go + version: ~0.1.0 + subpackages: + - reporter + - model testImport: - package: github.com/stretchr/testify subpackages: @@ -20,3 +25,7 @@ testImport: - suite - package: github.com/prometheus/client_golang version: v0.8.0 +- package: github.com/openzipkin/zipkin-go + version: ~0.1.0 + subpackages: + - reporter/http diff --git a/reporter.go b/reporter.go index fe6288c4..cc327173 100644 --- a/reporter.go +++ b/reporter.go @@ -245,10 +245,10 @@ func (r *remoteReporter) sendCloseEvent() { wg.Wait() } -// processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer. -// When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger. -// Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped -// reporting new spans. +// processQueue reads spans from the queue, converts them, and stores them in an internal buffer. +// When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger +// or Zipkin. Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the +// tracer stopped reporting new spans. func (r *remoteReporter) processQueue() { // flush causes the Sender to flush its accumulated spans and clear the buffer flush := func() { diff --git a/utils/utils.go b/utils/utils.go index ac3c325d..4563c836 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -75,6 +75,13 @@ func PackIPAsUint32(ip net.IP) uint32 { return 0 } +// UnpackUint32AsIP does the reverse of PackIPAsUint32 +func UnpackUint32AsIP(ip uint32) net.IP { + localIP := make(net.IP, 4) + binary.BigEndian.PutUint32(localIP, ip) + return localIP +} + // TimeToMicrosecondsSinceEpochInt64 converts Go time.Time to a long // representing time since epoch in microseconds, which is used expected // in the Jaeger spans encoded as Thrift. diff --git a/utils/utils_test.go b/utils/utils_test.go index 1df8e0b3..d0f3777d 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -89,3 +89,18 @@ func TestPackIPAsUint32(t *testing.T) { assert.Equal(t, test.out, ip) } } + +func TestUnpackUint32AsIP(t *testing.T) { + tests := []struct { + expected net.IP + in uint32 + }{ + {net.IPv4(1, 2, 3, 4), 1<<24 | 2<<16 | 3<<8 | 4}, + {net.IPv4(127, 0, 0, 1), 127<<24 | 1}, + } + + for _, test := range tests { + ip := UnpackUint32AsIP(test.in) + assert.Equal(t, test.expected, ip.To16()) + } +} diff --git a/zipkin/reporter.go b/zipkin/reporter.go new file mode 100644 index 00000000..c24fa28f --- /dev/null +++ b/zipkin/reporter.go @@ -0,0 +1,47 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + zipkinReporter "github.com/openzipkin/zipkin-go/reporter" + jaeger "github.com/uber/jaeger-client-go" +) + +// Reporter adapts a Zipkin Reporter to a Jaeger reporter +// +// Example: +// +// import zipkinHttp "github.com/openzipkin/zipkin-go/reporter/http" +// ... +// +// zipkinReporter := Reporter{ +// zipkinHttp.NewReporter(zipkinUrl) +// } +// +// Will give you a Jaeger reporter that can be used with the tracer. +type Reporter struct { + zipkinReporter.Reporter +} + +// Report converts the Jaeger span to a Zipkin Span and reports it to the +// Zipkin reporter. +func (r *Reporter) Report(span *jaeger.Span) { + r.Reporter.Send(*jaeger.BuildZipkinV2Span(span)) +} + +// Close proxies to the Zipkin reporter's close method +func (r *Reporter) Close() { + r.Reporter.Close() +} diff --git a/zipkin/reporter_test.go b/zipkin/reporter_test.go new file mode 100644 index 00000000..f9eba573 --- /dev/null +++ b/zipkin/reporter_test.go @@ -0,0 +1,72 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/crossdock/crossdock-go/assert" + zipkinModel "github.com/openzipkin/zipkin-go/model" + zipkinHttp "github.com/openzipkin/zipkin-go/reporter/http" + jaeger "github.com/uber/jaeger-client-go" +) + +func TestZipkinReporter(t *testing.T) { + var spansReceived []zipkinModel.SpanModel + + // Inspired by + // https://github.com/openzipkin/zipkin-go/blob/master/reporter/http/http_test.go + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("expected 'POST' request, got '%s'", r.Method) + } + + if spansReceived != nil { + t.Fatalf("received more than one set of spans") + } + + err := json.NewDecoder(r.Body).Decode(&spansReceived) + if err != nil { + t.Fatalf("unexpected error: %s", err.Error()) + } + })) + defer ts.Close() + + rep := zipkinHttp.NewReporter(ts.URL) + defer rep.Close() + zipkinReporter := &Reporter{ + zipkinHttp.NewReporter(ts.URL), + } + + tracer, closer := jaeger.NewTracer("DOOP", + jaeger.NewConstSampler(true), + zipkinReporter) + + sp := tracer.StartSpan("s1").(*jaeger.Span) + sp.SetTag("test", "abc") + sp.Finish() + + sp2 := tracer.StartSpan("s2").(*jaeger.Span) + sp2.Finish() + + closer.Close() + + assert.Len(t, spansReceived, 2) + assert.Equal(t, "abc", spansReceived[0].Tags["test"]) + assert.Equal(t, "s2", spansReceived[1].Name) +} diff --git a/zipkin_v2_span.go b/zipkin_v2_span.go new file mode 100644 index 00000000..cedaca69 --- /dev/null +++ b/zipkin_v2_span.go @@ -0,0 +1,156 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaeger + +import ( + "net" + + "github.com/opentracing/opentracing-go/ext" + + zipkinModel "github.com/openzipkin/zipkin-go/model" + "github.com/uber/jaeger-client-go/internal/spanlog" + "github.com/uber/jaeger-client-go/utils" +) + +// BuildZipkinV2Span converts a Jaeger span to a Zipkin v2 span +func BuildZipkinV2Span(span *Span) *zipkinModel.SpanModel { + parentID := zipkinModel.ID(span.context.parentID) + var ptrParentID *zipkinModel.ID + if parentID != 0 { + ptrParentID = &parentID + } + + localEndpoint := &zipkinModel.Endpoint{ + ServiceName: span.tracer.serviceName, + IPv4: utils.UnpackUint32AsIP(span.tracer.hostIPv4), + } + + kind, remoteEndpoint, tags := processTags(span) + + zSpan := &zipkinModel.SpanModel{ + SpanContext: zipkinModel.SpanContext{ + TraceID: zipkinModel.TraceID{ + High: span.context.traceID.High, + Low: span.context.traceID.Low, + }, + ID: zipkinModel.ID(span.context.spanID), + ParentID: ptrParentID, + Debug: span.context.IsDebug(), + }, + Name: span.operationName, + Timestamp: span.startTime, + Duration: span.duration, + Kind: kind, + LocalEndpoint: localEndpoint, + RemoteEndpoint: remoteEndpoint, + Annotations: buildZipkinV2Annotations(span), + Tags: tags, + } + return zSpan +} + +func buildZipkinV2Annotations(span *Span) []zipkinModel.Annotation { + annotations := make([]zipkinModel.Annotation, 0, len(span.logs)) + for _, log := range span.logs { + anno := zipkinModel.Annotation{ + Timestamp: log.Timestamp, + } + if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil { + anno.Value = truncateString(string(content)) + } else { + anno.Value = err.Error() + } + annotations = append(annotations, anno) + } + return annotations +} + +// Handle special tags that get converted to the kind and remote endpoint +// fields, and throw the rest of the tags into a map that becomes the Zipkin +// Tags field. +func processTags(s *Span) (zipkinModel.Kind, *zipkinModel.Endpoint, map[string]string) { + kind := zipkinModel.Undetermined + var endpoint *zipkinModel.Endpoint + tags := make(map[string]string) + + initEndpoint := func() { + if endpoint == nil { + endpoint = &zipkinModel.Endpoint{} + } + } + + for _, tag := range s.tags { + switch tag.key { + case string(ext.PeerHostIPv4): + initEndpoint() + endpoint.IPv4 = convertPeerIPv4(tag.value) + case string(ext.PeerPort): + initEndpoint() + endpoint.Port = convertPeerPort(tag.value) + case string(ext.PeerService): + initEndpoint() + endpoint.ServiceName = convertPeerService(tag.value) + case string(ext.SpanKind): + switch tag.value { + case ext.SpanKindRPCClientEnum: + kind = zipkinModel.Client + case ext.SpanKindRPCServerEnum: + kind = zipkinModel.Server + case ext.SpanKindProducerEnum: + kind = zipkinModel.Producer + case ext.SpanKindConsumerEnum: + kind = zipkinModel.Consumer + } + default: + tags[string(tag.key)] = stringify(tag.value) + } + } + return kind, endpoint, tags +} + +func convertPeerIPv4(value interface{}) net.IP { + if val, ok := value.(string); ok { + if ip := net.ParseIP(val); ip != nil { + return ip + } + } else if val, ok := value.(uint32); ok { + return utils.UnpackUint32AsIP(val) + } else if val, ok := value.(int32); ok { + return utils.UnpackUint32AsIP(uint32(val)) + } + return nil +} + +func convertPeerPort(value interface{}) uint16 { + if val, ok := value.(string); ok { + if port, err := utils.ParsePort(val); err == nil { + return port + } + } + if val, ok := value.(uint16); ok { + return val + } + if val, ok := value.(int); ok { + return uint16(val) + } + return 0 +} + +func convertPeerService(value interface{}) string { + if val, ok := value.(string); ok { + return val + } + return "" +} diff --git a/zipkin_v2_span_test.go b/zipkin_v2_span_test.go new file mode 100644 index 00000000..9a628706 --- /dev/null +++ b/zipkin_v2_span_test.go @@ -0,0 +1,159 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaeger + +import ( + "encoding/binary" + "encoding/json" + "testing" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + zipkinModel "github.com/openzipkin/zipkin-go/model" + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-client-go/utils" +) + +// Creates a simple Zipkin span from a Jaeger span. The mutator func is +// provided by the test function to modify the generated Jaeger span before it +// is finished and converted to Zipkin's span type. +func createTestZipkinV2Span(mutator func(*Span)) *zipkinModel.SpanModel { + tracer, closer := NewTracer("DOOP", + NewConstSampler(true), + NewNullReporter()) + defer closer.Close() + + sp := tracer.StartSpan("s1").(*Span) + mutator(sp) + sp.Finish() + + return BuildZipkinV2Span(sp) +} + +func TestZipkinV2TagConversion(t *testing.T) { + zSpan := createTestZipkinV2Span(func(sp *Span) { + sp.SetTag("a", 1) + sp.SetTag("b", "test") + }) + + // There are two built-in tags: "sampler.type" and "sampler.param" + assert.Len(t, zSpan.Tags, 4) + + // Converts ints to strings + assert.Equal(t, "1", zSpan.Tags["a"]) + assert.Equal(t, "test", zSpan.Tags["b"]) +} + +func TestZipkinV2AnnotationConversion(t *testing.T) { + zSpan := createTestZipkinV2Span(func(sp *Span) { + sp.LogEvent("something happened") + sp.LogKV("event", "unknown", "error", "none") + }) + + assert.Len(t, zSpan.Annotations, 2) + + assert.Equal(t, "something happened", zSpan.Annotations[0].Value) + + var fields map[string]string + if err := json.Unmarshal([]byte(zSpan.Annotations[1].Value), &fields); err != nil { + assert.FailNow(t, "annotation was not json", err.Error()) + } + + assert.Equal(t, "unknown", fields["event"]) + assert.Equal(t, "none", fields["error"]) +} + +func TestZipkinV2KindConversion(t *testing.T) { + tests := []struct { + jaegerKind ext.SpanKindEnum + zipkinKind zipkinModel.Kind + }{ + { + jaegerKind: ext.SpanKindRPCClientEnum, + zipkinKind: zipkinModel.Client, + }, + { + jaegerKind: ext.SpanKindRPCServerEnum, + zipkinKind: zipkinModel.Server, + }, + { + jaegerKind: ext.SpanKindProducerEnum, + zipkinKind: zipkinModel.Producer, + }, + { + jaegerKind: ext.SpanKindConsumerEnum, + zipkinKind: zipkinModel.Consumer, + }, + } + for _, test := range tests { + zSpan := createTestZipkinV2Span(func(sp *Span) { + sp.SetTag(string(ext.SpanKind), test.jaegerKind) + }) + + assert.Equal(t, test.zipkinKind, zSpan.Kind) + } +} + +func TestZipkinV2LocalEndpointConversion(t *testing.T) { + zSpan := createTestZipkinV2Span(func(sp *Span) {}) + + hostIP, err := utils.HostIP() + if err != nil { + assert.FailNow(t, "Could not determine local ip: ", err.Error()) + } + + assert.Equal(t, hostIP.String(), zSpan.LocalEndpoint.IPv4.String()) +} + +func TestZipkinV2TraceIDConversion(t *testing.T) { + var jaegerTraceID TraceID + + zSpan := createTestZipkinV2Span(func(sp *Span) { + jaegerTraceID = sp.context.TraceID() + }) + + assert.Equal(t, jaegerTraceID.Low, zSpan.TraceID.Low) + assert.Equal(t, jaegerTraceID.High, zSpan.TraceID.High) +} + +func TestZipkinV2RemoteEndpointConversion(t *testing.T) { + zSpan := createTestZipkinV2Span(func(sp *Span) { + ext.PeerService.Set(sp, "peer") + ext.PeerPort.Set(sp, 80) + ext.PeerHostIPv4.Set(sp, 2130706433) + }) + + assert.NotNil(t, zSpan.RemoteEndpoint) + assert.Equal(t, uint32(2130706433), binary.BigEndian.Uint32(zSpan.RemoteEndpoint.IPv4)) + assert.Equal(t, uint16(80), zSpan.RemoteEndpoint.Port) + assert.Equal(t, "peer", zSpan.RemoteEndpoint.ServiceName) +} + +func TestZipkinV2ParentSpanID(t *testing.T) { + tracer, closer := NewTracer("DOOP", + NewConstSampler(true), + NewNullReporter()) + defer closer.Close() + + sp := tracer.StartSpan("s1") + sp2 := tracer.StartSpan("s2", opentracing.ChildOf(sp.Context())).(*Span) + sp2.Finish() + sp.Finish() + + zSpan := BuildZipkinV2Span(sp2) + + assert.NotNil(t, zSpan.ParentID) + assert.Equal(t, *zSpan.ParentID, zipkinModel.ID(sp.(*Span).context.SpanID())) +}