diff --git a/crossdock/server/endtoend.py b/crossdock/server/endtoend.py index 29058708..d5b317a6 100644 --- a/crossdock/server/endtoend.py +++ b/crossdock/server/endtoend.py @@ -28,7 +28,7 @@ SAMPLER_TYPE_REMOTE, ) from jaeger_client.sampler import RemoteControlledSampler, ConstSampler -from jaeger_client.reporter import Reporter +from jaeger_client.reporter import ThriftReporter from jaeger_client.throttler import RemoteThrottler from jaeger_client.tracer import Tracer @@ -67,8 +67,8 @@ def __init__(self): init_sampler = cfg.sampler channel = self.local_agent_sender - reporter = Reporter(channel=channel, - flush_interval=cfg.reporter_flush_interval) + reporter = ThriftReporter(channel=channel, + flush_interval=cfg.reporter_flush_interval) remote_sampler = RemoteControlledSampler( channel=channel, diff --git a/jaeger_client/config.py b/jaeger_client/config.py index c8aaf249..33cf06fb 100644 --- a/jaeger_client/config.py +++ b/jaeger_client/config.py @@ -21,12 +21,13 @@ import opentracing from opentracing.propagation import Format from . import Tracer -from .local_agent_net import LocalAgentSender +from .local_agent_net import LocalAgentReader, LocalAgentSender from .throttler import RemoteThrottler from .reporter import ( - Reporter, + ThriftReporter, CompositeReporter, LoggingReporter, + ZipkinV2Reporter, ) from .sampler import ( ConstSampler, @@ -115,6 +116,7 @@ def _validate_config(self, config): 'sampler', 'tags', 'enabled', + 'reporter_type', 'reporter_batch_size', 'propagation', 'max_tag_value_length', @@ -123,7 +125,9 @@ def _validate_config(self, config): 'trace_id_header', 'baggage_header_prefix', 'service_name', - 'throttler'] + 'throttler', + 'headers', + 'zipkin_spans_url'] config_keys = config.keys() unexpected_config_keys = [k for k in config_keys if k not in allowed_keys] if unexpected_config_keys: @@ -146,6 +150,13 @@ def error_reporter(self): def enabled(self): return get_boolean(self.config.get('enabled', True), True) + @property + def reporter_type(self): + rt = self.config.get('reporter_type', 'jaeger').lower() + if rt not in ('jaeger', 'zipkin_v2'): + raise ValueError('config reporter_type must be "jaeger" or "zipkin_v2"') + return rt + @property def reporter_batch_size(self): return int(self.config.get('reporter_batch_size', 10)) @@ -312,6 +323,14 @@ def throttler_refresh_interval(self): except: return DEFAULT_THROTTLER_REFRESH_INTERVAL + @property + def headers(self): + return self.config.get('headers', {}) + + @property + def zipkin_spans_url(self): + return self.config.get('zipkin_spans_url', 'http://localhost:9411/api/v2/spans') + @staticmethod def initialized(): with Config._initialized_lock: @@ -353,14 +372,32 @@ def new_tracer(self, io_loop=None): max_operations=self.max_operations) logger.info('Using sampler %s', sampler) - reporter = Reporter( - channel=channel, - queue_capacity=self.reporter_queue_size, - batch_size=self.reporter_batch_size, - flush_interval=self.reporter_flush_interval, - logger=logger, - metrics_factory=self._metrics_factory, - error_reporter=self.error_reporter) + if self.reporter_type == 'jaeger': + reporter = ThriftReporter( + channel=channel, + queue_capacity=self.reporter_queue_size, + batch_size=self.reporter_batch_size, + flush_interval=self.reporter_flush_interval, + logger=logger, + metrics_factory=self._metrics_factory, + error_reporter=self.error_reporter + ) + elif self.reporter_type == 'zipkin_v2': + kwargs = {} + if self.headers: + kwargs['headers'] = self.headers + + reporter = ZipkinV2Reporter( + channel=channel, + spans_url=self.zipkin_spans_url, + queue_capacity=self.reporter_queue_size, + batch_size=self.reporter_batch_size, + flush_interval=self.reporter_flush_interval, + logger=logger, + metrics_factory=self._metrics_factory, + error_reporter=self.error_reporter, + **kwargs + ) if self.logging: reporter = CompositeReporter(reporter, LoggingReporter(logger)) @@ -405,11 +442,20 @@ def _initialize_global_tracer(self, tracer): def _create_local_agent_channel(self, io_loop): """ Create an out-of-process channel communicating to local jaeger-agent. - Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled - via JSON HTTP. + If using Jaeger backend, spans are submitted as SOCK_DGRAM Thrift, sampling + strategy is polled via JSON HTTP. :param self: instance of Config """ + if self.reporter_type == 'zipkin_v2': + logger.info('Initializing Jaeger Tracer with Zipkin V2 reporter') + return LocalAgentReader( + host=self.local_agent_reporting_host, + sampling_port=self.local_agent_sampling_port, + reporting_port=self.local_agent_reporting_port, + throttling_port=self.throttler_port, + io_loop=io_loop + ) logger.info('Initializing Jaeger Tracer with UDP reporter') return LocalAgentSender( host=self.local_agent_reporting_host, diff --git a/jaeger_client/local_agent_net.py b/jaeger_client/local_agent_net.py index 5266fc51..93f312b7 100644 --- a/jaeger_client/local_agent_net.py +++ b/jaeger_client/local_agent_net.py @@ -53,16 +53,12 @@ def request_throttling_credits(self, ] + [('operations', op) for op in operations]) -class LocalAgentSender(TBufferedTransport): +class LocalAgentReader(object): """ - LocalAgentSender implements everything necessary to communicate with - local jaeger-agent. This class is designed to work in tornado and - non-tornado environments. If in torndado, pass in the ioloop, if not - then LocalAgentSender will create one for itself. - - NOTE: LocalAgentSender derives from TBufferedTransport. This will buffer - up all written data until flush() is called. Flush gets called at the - end of the batch span submission call. + LocalAgentReader implements what is necessary to obtain sampling strategies + and throttling credits from the local jaeger-agent. This class is designed + to work in tornado and non-tornado environments. If in torndado, pass in the + ioloop, if not then LocalAgentSender will create one for itself. """ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None): @@ -77,11 +73,6 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling if throttling_port: self.throttling_http = LocalAgentHTTP(host, throttling_port) - # UDP reporting - this will only get written to after our flush() call. - # We are buffering things up because we are a TBufferedTransport. - udp = TUDPTransport(host, reporting_port) - TBufferedTransport.__init__(self, udp) - def _create_new_thread_loop(self): """ Create a daemonized thread that will run Tornado IOLoop. @@ -92,10 +83,6 @@ def _create_new_thread_loop(self): self._thread_loop.start() return self._thread_loop._io_loop - def readFrame(self): - """Empty read frame that is never ready""" - return Future() - # Pass-through for HTTP sampling strategies request. def request_sampling_strategy(self, *args, **kwargs): return self.local_agent_http.request_sampling_strategy(*args, **kwargs) @@ -103,3 +90,26 @@ def request_sampling_strategy(self, *args, **kwargs): # Pass-through for HTTP throttling credit request. def request_throttling_credits(self, *args, **kwargs): return self.throttling_http.request_throttling_credits(*args, **kwargs) + + +class LocalAgentSender(LocalAgentReader, TBufferedTransport): + """ + LocalAgentSender implements everything necessary to report spans to + the local jaeger-agent. + + NOTE: LocalAgentSender derives from TBufferedTransport. This will buffer + up all written data until flush() is called. Flush gets called at the + end of the batch span submission call. + """ + + def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None): + LocalAgentReader.__init__(self, host, sampling_port, reporting_port, + io_loop, throttling_port) + # UDP reporting - this will only get written to after our flush() call. + # We are buffering things up because we are a TBufferedTransport. + udp = TUDPTransport(host, reporting_port) + TBufferedTransport.__init__(self, udp) + + def readFrame(self): + """Empty read frame that is never ready""" + return Future() diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 1fedb995..f1c9b1d5 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -14,10 +14,12 @@ from __future__ import absolute_import +import json import logging import threading import tornado.gen +import tornado.httpclient import tornado.ioloop import tornado.queues import socket @@ -25,6 +27,7 @@ from .constants import DEFAULT_FLUSH_INTERVAL from . import thrift from . import ioloop_util +from . import zipkin_v2 from .metrics import Metrics, LegacyMetricsFactory from .utils import ErrorReporter @@ -73,8 +76,12 @@ def report_span(self, span): self.logger.info('Reporting span %s', span) -class Reporter(NullReporter): - """Receives completed spans from Tracer and submits them out of process.""" +class QueueReporter(NullReporter): + """ + Base Reporter class for handling, batching, and queuing Tracer-provided spans + for transmission to a backend service. Should not be used directly and instead + should be subclassed with appropriate create_agent(), _send(), and make_batch() methods. + """ def __init__(self, channel, queue_capacity=100, batch_size=10, flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, metrics=None, metrics_factory=None, @@ -97,14 +104,13 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, """ from threading import Lock - self._channel = channel self.queue_capacity = queue_capacity self.batch_size = batch_size self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics()) self.metrics = ReporterMetrics(self.metrics_factory) self.error_reporter = error_reporter or ErrorReporter(Metrics()) self.logger = kwargs.get('logger', default_logger) - self.agent = Agent.Client(self._channel, self) + self.agent = self.create_agent(channel) if queue_capacity < batch_size: raise ValueError('Queue capacity cannot be less than batch size') @@ -123,6 +129,9 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, self._process_lock = Lock() self._process = None + def create_agent(self, channel): + raise NotImplementedError() + def set_process(self, service_name, tags, max_length): with self._process_lock: self._process = thrift.make_process( @@ -177,14 +186,8 @@ def _consume_queue(self): self.metrics.reporter_queue_length(self.queue.qsize()) self.logger.info('Span publisher exited') - # method for protocol factory - def getProtocol(self, transport): - """ - Implements Thrift ProtocolFactory interface - :param: transport: - :return: Thrift compact protocol - """ - return TCompactProtocol.TCompactProtocol(transport) + def make_batch(self, spans, process): + raise NotImplementedError() @tornado.gen.coroutine def _submit(self, spans): @@ -195,7 +198,7 @@ def _submit(self, spans): if not process: return try: - batch = thrift.make_jaeger_batch(spans=spans, process=process) + batch = self.make_batch(spans, process) yield self._send(batch) self.metrics.reporter_success(len(spans)) except socket.error as e: @@ -209,11 +212,7 @@ def _submit(self, spans): @tornado.gen.coroutine def _send(self, batch): - """ - Send batch of spans out via thrift transport. Any exceptions thrown - will be caught above in the exception handler of _submit(). - """ - return self.agent.emitBatch(batch) + raise NotImplementedError() def close(self): """ @@ -275,3 +274,80 @@ def on_close(_): f.add_done_callback(on_close) return future + + +class ThriftReporter(QueueReporter): + """ + Receives completed spans from Tracer and submits them out of process to local + jaeger-agent. + """ + + def __init__(self, channel, queue_capacity=100, batch_size=10, + flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, + error_reporter=None, metrics=None, metrics_factory=None, + **kwargs): + QueueReporter.__init__(self, channel, queue_capacity, batch_size, flush_interval, + io_loop, error_reporter, metrics, metrics_factory, **kwargs) + + def create_agent(self, channel): + return Agent.Client(channel, self) + + def make_batch(self, spans, process): + return thrift.make_jaeger_batch(spans=spans, process=process) + + # method for protocol factory + def getProtocol(self, transport): + """ + Implements Thrift ProtocolFactory interface + :param: transport: + :return: Thrift compact protocol + """ + return TCompactProtocol.TCompactProtocol(transport) + + @tornado.gen.coroutine + def _send(self, batch): + """ + Send batch of spans out via thrift transport. Any exceptions thrown + will be caught above in the exception handler of _submit(). + """ + return self.agent.emitBatch(batch) + + +Reporter = ThriftReporter # backward compatibility + + +class ZipkinV2Reporter(QueueReporter): + """Receives completed spans from Tracer and submits to Zipkin's /api/v2/spans endpoint""" + + def __init__(self, channel, spans_url, queue_capacity=100, batch_size=10, + flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, + metrics=None, metrics_factory=None, **kwargs): + self.spans_url = spans_url + self.headers = kwargs.get('headers', {}) + QueueReporter.__init__(self, channel, queue_capacity, batch_size, flush_interval, + io_loop, error_reporter, metrics, metrics_factory, **kwargs) + + def create_agent(self, channel): + return None + + def make_batch(self, spans, process): + return zipkin_v2.make_zipkin_v2_batch(spans=spans, process=process) + + @tornado.gen.coroutine + def _send(self, batch): + """ + Send batch of spans out via AsyncHTTPClient. Any exceptions thrown + will be caught above in the exception handler of _submit(). + """ + client = tornado.httpclient.AsyncHTTPClient() + headers = {'content-type': 'application/json'} + if self.headers: + headers.update(self.headers) + + request = tornado.httpclient.HTTPRequest( + method='POST', + url=self.spans_url, + headers=headers, + body=json.dumps(batch) + ) + client.fetch(request) diff --git a/jaeger_client/zipkin_v2.py b/jaeger_client/zipkin_v2.py new file mode 100644 index 00000000..9e934f2f --- /dev/null +++ b/jaeger_client/zipkin_v2.py @@ -0,0 +1,161 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from binascii import hexlify +from struct import pack +import json + +from opentracing.ext import tags as ext_tags + +from jaeger_client.thrift_gen.jaeger.ttypes import TagType +from .thrift import timestamp_micros +from . import constants + + +def make_zipkin_v2_batch(spans=None, process=None): + batched_spans = [] + for span in spans: + with span.update_lock: + v2_span = to_zipkin_v2_span(span, process) + batched_spans.append(v2_span) + return batched_spans + + +def to_zipkin_v2_span(span, process): + """Converts jaeger thrift span and process information to Zipkin's API v2 span format""" + v2_span = dict( + traceId=to_encoded_id(span.trace_id), + id=to_encoded_id(span.span_id), + timestamp=timestamp_micros(span.start_time), + debug=span.is_debug(), + ) + + if span.end_time: + v2_span['duration'] = timestamp_micros(span.end_time - span.start_time) + + name = getattr(span, 'operation_name', '') + if name: + v2_span['name'] = name + + parent_id = getattr(span, 'parent_id', 0) + if parent_id: + v2_span['parentId'] = to_encoded_id(parent_id) + + kind = get_span_kind(span) + if kind: + v2_span['kind'] = kind + + local_endpoint = get_local_endpoint(process) + if local_endpoint: + v2_span['localEndpoint'] = local_endpoint + + remote_endpoint = get_remote_endpoint(process) + if remote_endpoint: + v2_span['remoteEndpoint'] = remote_endpoint + + tags = format_span_tags(span) + if tags: + v2_span['tags'] = tags + + annotations = [log_to_annotation(l) for l in span.logs] + if annotations: + v2_span['annotations'] = annotations + + return v2_span + + +def to_encoded_id(jaeger_id): + return hexlify(pack('>Q', jaeger_id)).decode('ascii') + + +def get_span_kind(span): + kind = None + for tag in span.tags: + if tag.key == ext_tags.SPAN_KIND: + kind = {ext_tags.SPAN_KIND_RPC_CLIENT: 'CLIENT', + ext_tags.SPAN_KIND_RPC_SERVER: 'SERVER'}[tag.vStr] + return kind + + +ttype_attr_map = {TagType.BINARY: 'vBinary', TagType.BOOL: 'vBool', TagType.DOUBLE: 'vDouble', + TagType.LONG: 'vLong', TagType.STRING: 'vStr'} + + +def get_tag_value(tag): + attr = ttype_attr_map[tag.vType] + return getattr(tag, attr, None) + + +def get_local_endpoint(process): + local_endpoint = {} + service_name = getattr(process, 'serviceName', '') + if service_name: + local_endpoint['serviceName'] = service_name + if process.tags: + for tag in process.tags: + if tag.key == constants.JAEGER_IP_TAG_KEY: + ipv4 = get_tag_value(tag) + if ipv4: + local_endpoint['ipv4'] = ipv4 + break + return local_endpoint + + +def get_remote_endpoint(process): + tag_key_map = [(ext_tags.PEER_SERVICE, 'serviceName'), (ext_tags.PEER_HOST_IPV4, 'ipv4'), + (ext_tags.PEER_HOST_IPV6, 'ipv6'), (ext_tags.PEER_PORT, 'port')] + + remote_endpoint = {} + if process.tags: + for tag in process.tags: + for tag_key, endpoint_key in list(tag_key_map): + if tag.key == tag_key: + tag_key_map.remove((tag_key, endpoint_key)) + value = get_tag_value(tag) + if value: + if tag.key == ext_tags.PEER_PORT: + value = int(value) + remote_endpoint[endpoint_key] = value + continue + if not tag_key_map: + break + + return remote_endpoint + + +redundant_tags = (ext_tags.PEER_HOST_IPV4, ext_tags.PEER_HOST_IPV6, ext_tags.PEER_PORT, + ext_tags.PEER_SERVICE, ext_tags.SPAN_KIND, ext_tags.SPAN_KIND_RPC_CLIENT, + ext_tags.SPAN_KIND_RPC_SERVER) + + +def format_span_tags(span): + formatted = {} + for tag in span.tags: + if tag.key not in redundant_tags: + k, v = format_span_tag(tag) + if v: + formatted[k] = v + return formatted + + +def format_span_tag(tag): + key = tag.key + value = get_tag_value(tag) + return key, value + + +def log_to_annotation(log): + annotation = dict(timestamp=log.timestamp) + tags = dict([format_span_tag(t) for t in log.fields]) + annotation['value'] = json.dumps(tags) + return annotation diff --git a/tests/test_config.py b/tests/test_config.py index fa75cca9..7c7fd380 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -144,3 +144,22 @@ def test_initialize_tracer(self): tracer = c.initialize_tracer() assert opentracing.tracer == tracer + + def test_reporter_type(self): + c = Config({'reporter_type': 'JaEGer'}, service_name='x') + assert c.reporter_type == 'jaeger' + c = Config({'reporter_type': 'ZiPKIn_V2'}, service_name='x') + assert c.reporter_type == 'zipkin_v2' + + def test_headers(self): + c = Config({}, service_name='x') + assert c.headers == {} + cfg_dict = {'headers': {'HeaderOne': 'OneVal', 'HeaderTwo': 'TwoVal'}} + c = Config(cfg_dict, service_name='x') + assert c.headers == cfg_dict['headers'] + + def test_zipkin_spans_url(self): + c = Config({}, service_name='x') + assert c.zipkin_spans_url == 'http://localhost:9411/api/v2/spans' + c = Config({'zipkin_spans_url': 'someurl'}, service_name='x') + assert c.zipkin_spans_url == 'someurl' diff --git a/tests/test_reporter.py b/tests/test_reporter.py index b7e8e69e..d166985a 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -30,7 +30,7 @@ from jaeger_client.utils import ErrorReporter from tornado.ioloop import IOLoop from tornado.testing import AsyncTestCase, gen_test -from jaeger_client.reporter import Reporter +from jaeger_client.reporter import ThriftReporter from jaeger_client.ioloop_util import future_result @@ -132,7 +132,7 @@ def _incr_count(self, key, value): self.counters[key] = value + self.counters.get(key, 0) -class ReporterTest(AsyncTestCase): +class ThriftReporterTest(AsyncTestCase): @pytest.yield_fixture def thread_loop(self): yield @@ -149,13 +149,13 @@ def _new_span(name): @staticmethod def _new_reporter(batch_size, flush=None, queue_cap=100): - reporter = Reporter(channel=mock.MagicMock(), - io_loop=IOLoop.current(), - batch_size=batch_size, - flush_interval=flush, - metrics_factory=FakeMetricsFactory(), - error_reporter=HardErrorReporter(), - queue_capacity=queue_cap) + reporter = ThriftReporter(channel=mock.MagicMock(), + io_loop=IOLoop.current(), + batch_size=batch_size, + flush_interval=flush, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter(), + queue_capacity=queue_cap) reporter.set_process('service', {}, max_length=0) sender = FakeSender() reporter._send = sender diff --git a/tests/test_zipkin_v2.py b/tests/test_zipkin_v2.py new file mode 100644 index 00000000..b54e355e --- /dev/null +++ b/tests/test_zipkin_v2.py @@ -0,0 +1,160 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +from time import time +import json + +import mock + +from jaeger_client.zipkin_v2 import make_zipkin_v2_batch, to_zipkin_v2_span +from jaeger_client import Span, SpanContext, ConstSampler +import jaeger_client.thrift_gen.jaeger.ttypes as ttypes +from jaeger_client.constants import JAEGER_IP_TAG_KEY +from jaeger_client.thrift import timestamp_micros +from opentracing.ext import tags as ext_tags +from jaeger_client import thrift + + +def to_tag(key, value): + return thrift.make_string_tag(key=key, value=value, max_length=100) + + +def test_to_v2_span_no_parent_nor_duration_nor_tags_nor_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + process = ttypes.Process(serviceName='x') + zip_span = to_zipkin_v2_span(span, process) + expected = dict(debug=False, id='0000000000000001', + traceId='0000000000000001', + localEndpoint=dict(serviceName='x'), + name='operation', timestamp=timestamp_micros(start_time)) + assert zip_span == expected + + +def test_to_v2_span_with_parent_no_duration_nor_tags_nor_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=1, span_id=2, parent_id=1, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + zip_span = to_zipkin_v2_span(span, process) + expected = dict(debug=False, id='0000000000000002', + parentId='0000000000000001', traceId='0000000000000001', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + name='operation', timestamp=timestamp_micros(start_time)) + assert zip_span == expected + + +def test_to_v2_span_with_parent_and_duration_and_tags_no_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=1, span_id=2, parent_id=1, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + span.set_tag('SomeKey', '123123') + span.set_tag(ext_tags.SPAN_KIND, ext_tags.SPAN_KIND_RPC_CLIENT) + end_time = start_time + 20 + span.end_time = end_time + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + zip_span = to_zipkin_v2_span(span, process) + expected = dict(kind='CLIENT', debug=False, id='0000000000000002', + parentId='0000000000000001', traceId='0000000000000001', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + name='operation', timestamp=timestamp_micros(start_time), + duration=20000000, tags=dict(SomeKey='123123')) + assert zip_span == expected + + +def test_to_v2_span_with_parent_and_duration_and_tags_and_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=18446744073709551614, span_id=18446744073709551615, + parent_id=18446744073709551614, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + span.set_tag('SomeKey', '123123') + span.set_tag(ext_tags.SPAN_KIND, ext_tags.SPAN_KIND_RPC_SERVER) + event_time = time() + 3 + span.log_kv({'SomeTagKey': 'SomeTagValue'}, timestamp=event_time) + end_time = start_time + 20 + span.end_time = end_time + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + zip_span = to_zipkin_v2_span(span, process) + expected = dict(kind='SERVER', debug=False, id='ffffffffffffffff', + parentId='fffffffffffffffe', traceId='fffffffffffffffe', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + name='operation', timestamp=timestamp_micros(start_time), + duration=20000000, tags=dict(SomeKey='123123'), + annotations=[dict(timestamp=timestamp_micros(event_time), + value=json.dumps(dict(SomeTagKey='SomeTagValue')))]) + assert zip_span == expected + + +def test_to_v2_span_remote_with_parent_and_duration_and_tags_and_annotations(tracer): + start_time = time() + ctx = SpanContext(trace_id=9205364235243340812, span_id=9830726033745040073, + parent_id=9205364235243340812, flags=1) + span = Span(context=ctx, operation_name='operation', start_time=start_time, tracer=tracer) + span.set_tag('SomeKey', '123123') + event_time = time() + 3 + span.log_kv({'SomeTagKey': 'SomeTagValue1'}, timestamp=event_time) + span.log_kv({'SomeOtherTagKey': 'SomeTagValue2'}, timestamp=event_time + 1) + span.log_kv({'SomeAdditionalTagKey': 'SomeTagValue3'}, timestamp=event_time + 2) + end_time = start_time + 20 + span.end_time = end_time + process = ttypes.Process(serviceName='x', tags=[ + to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1'), + to_tag(ext_tags.PEER_HOST_IPV4, '192.168.34.1'), + to_tag(ext_tags.PEER_HOST_IPV6, '::1'), + to_tag(ext_tags.PEER_SERVICE, 'remote_operation'), + to_tag(ext_tags.PEER_PORT, 99776), + to_tag('SomeOtherTag', 'TagValue') + ]) + + def is_debug(*args): # simultaneously sampled and debug + return True + + span.is_debug = is_debug + zip_span = to_zipkin_v2_span(span, process) + + expected = dict(debug=True, id='886dc13e058ed2c9', + parentId='7fc005fff5c3c40c', traceId='7fc005fff5c3c40c', + localEndpoint=dict(serviceName='x', ipv4='127.0.0.1'), + remoteEndpoint=dict(serviceName='remote_operation', + ipv4='192.168.34.1', ipv6='::1', + port=99776), + name='operation', timestamp=timestamp_micros(start_time), + duration=20000000, tags=dict(SomeKey='123123'), + annotations=[ + dict(timestamp=timestamp_micros(event_time), + value=json.dumps(dict(SomeTagKey='SomeTagValue1'))), + dict(timestamp=timestamp_micros(event_time + 1), + value=json.dumps(dict(SomeOtherTagKey='SomeTagValue2'))), + dict(timestamp=timestamp_micros(event_time + 2), + value=json.dumps(dict(SomeAdditionalTagKey='SomeTagValue3'))), + ]) + assert zip_span == expected + + +def test_make_zipkin_v2_batch(tracer): + ctx_one = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1) + span_one = Span(context=ctx_one, operation_name='operation', start_time=time(), tracer=tracer) + ctx_two = SpanContext(trace_id=1, span_id=2, parent_id=1, flags=1) + span_two = Span(context=ctx_two, operation_name='operation', start_time=time(), tracer=tracer) + ctx_three = SpanContext(trace_id=1, span_id=3, parent_id=2, flags=1) + span_three = Span(context=ctx_three, operation_name='operation', start_time=time(), tracer=tracer) + process = ttypes.Process(serviceName='x', tags=[to_tag(JAEGER_IP_TAG_KEY, '127.0.0.1')]) + batch = make_zipkin_v2_batch([span_one, span_two, span_three], process) + assert len(batch) == 3 + assert batch[0]['id'] == '0000000000000001' + assert batch[1]['id'] == '0000000000000002' + assert batch[2]['id'] == '0000000000000003'