Skip to content

Commit

Permalink
Add Zipkin API V2 Reporter
Browse files Browse the repository at this point in the history
Expands span reporter selection by breaking out Reporter into base
QueueReporter class and using reporting method-specific ThriftReporter
and ZipkinV2Reporter.  Also breaks LocalAgentSender into two classes
including LocalAgentReader for ThriftReporter only usage of
TBufferedTransport.

These changes allow users to continue reporting spans oob to the local
jaeger-agent but with the option of reporting to Zipkin backend.

In the spirit of
jaegertracing/jaeger-client-java#399
and
jaegertracing/jaeger-client-go#310

Signed-off-by: Ryan Fitzpatrick <[email protected]>
  • Loading branch information
Ryan Fitzpatrick committed Jul 6, 2018
1 parent 00d3d4a commit cdbd4e1
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 61 deletions.
6 changes: 3 additions & 3 deletions crossdock/server/endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
SAMPLER_TYPE_REMOTE,
)
from jaeger_client.sampler import RemoteControlledSampler, ConstSampler
from jaeger_client.reporter import Reporter
from jaeger_client.reporter import ThriftReporter
from jaeger_client.throttler import RemoteThrottler
from jaeger_client.tracer import Tracer

Expand Down Expand Up @@ -67,8 +67,8 @@ def __init__(self):
init_sampler = cfg.sampler
channel = self.local_agent_sender

reporter = Reporter(channel=channel,
flush_interval=cfg.reporter_flush_interval)
reporter = ThriftReporter(channel=channel,
flush_interval=cfg.reporter_flush_interval)

remote_sampler = RemoteControlledSampler(
channel=channel,
Expand Down
72 changes: 59 additions & 13 deletions jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import opentracing
from opentracing.propagation import Format
from . import Tracer
from .local_agent_net import LocalAgentSender
from .local_agent_net import LocalAgentReader, LocalAgentSender
from .throttler import RemoteThrottler
from .reporter import (
Reporter,
ThriftReporter,
CompositeReporter,
LoggingReporter,
ZipkinV2Reporter,
)
from .sampler import (
ConstSampler,
Expand Down Expand Up @@ -115,6 +116,7 @@ def _validate_config(self, config):
'sampler',
'tags',
'enabled',
'reporter_type',
'reporter_batch_size',
'propagation',
'max_tag_value_length',
Expand All @@ -123,7 +125,9 @@ def _validate_config(self, config):
'trace_id_header',
'baggage_header_prefix',
'service_name',
'throttler']
'throttler',
'headers',
'zipkin_spans_url']
config_keys = config.keys()
unexpected_config_keys = [k for k in config_keys if k not in allowed_keys]
if unexpected_config_keys:
Expand All @@ -146,6 +150,13 @@ def error_reporter(self):
def enabled(self):
return get_boolean(self.config.get('enabled', True), True)

@property
def reporter_type(self):
rt = self.config.get('reporter_type', 'jaeger').lower()
if rt not in ('jaeger', 'zipkin_v2'):
raise ValueError('config reporter_type must be "jaeger" or "zipkin_v2"')
return rt

@property
def reporter_batch_size(self):
return int(self.config.get('reporter_batch_size', 10))
Expand Down Expand Up @@ -312,6 +323,14 @@ def throttler_refresh_interval(self):
except:
return DEFAULT_THROTTLER_REFRESH_INTERVAL

@property
def headers(self):
return self.config.get('headers', {})

@property
def zipkin_spans_url(self):
return self.config.get('zipkin_spans_url', 'http://localhost:9411/api/v2/spans')

@staticmethod
def initialized():
with Config._initialized_lock:
Expand Down Expand Up @@ -353,14 +372,32 @@ def new_tracer(self, io_loop=None):
max_operations=self.max_operations)
logger.info('Using sampler %s', sampler)

reporter = Reporter(
channel=channel,
queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
logger=logger,
metrics_factory=self._metrics_factory,
error_reporter=self.error_reporter)
if self.reporter_type == 'jaeger':
reporter = ThriftReporter(
channel=channel,
queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
logger=logger,
metrics_factory=self._metrics_factory,
error_reporter=self.error_reporter
)
elif self.reporter_type == 'zipkin_v2':
kwargs = {}
if self.headers:
kwargs['headers'] = self.headers

reporter = ZipkinV2Reporter(
channel=channel,
spans_url=self.zipkin_spans_url,
queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
logger=logger,
metrics_factory=self._metrics_factory,
error_reporter=self.error_reporter,
**kwargs
)

if self.logging:
reporter = CompositeReporter(reporter, LoggingReporter(logger))
Expand Down Expand Up @@ -405,11 +442,20 @@ def _initialize_global_tracer(self, tracer):
def _create_local_agent_channel(self, io_loop):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.
If using Jaeger backend, spans are submitted as SOCK_DGRAM Thrift, sampling
strategy is polled via JSON HTTP.
:param self: instance of Config
"""
if self.reporter_type == 'zipkin_v2':
logger.info('Initializing Jaeger Tracer with Zipkin V2 reporter')
return LocalAgentReader(
host=self.local_agent_reporting_host,
sampling_port=self.local_agent_sampling_port,
reporting_port=self.local_agent_reporting_port,
throttling_port=self.throttler_port,
io_loop=io_loop
)
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
host=self.local_agent_reporting_host,
Expand Down
46 changes: 28 additions & 18 deletions jaeger_client/local_agent_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,12 @@ def request_throttling_credits(self,
] + [('operations', op) for op in operations])


class LocalAgentSender(TBufferedTransport):
class LocalAgentReader(object):
"""
LocalAgentSender implements everything necessary to communicate with
local jaeger-agent. This class is designed to work in tornado and
non-tornado environments. If in torndado, pass in the ioloop, if not
then LocalAgentSender will create one for itself.
NOTE: LocalAgentSender derives from TBufferedTransport. This will buffer
up all written data until flush() is called. Flush gets called at the
end of the batch span submission call.
LocalAgentReader implements what is necessary to obtain sampling strategies
and throttling credits from the local jaeger-agent. This class is designed
to work in tornado and non-tornado environments. If in torndado, pass in the
ioloop, if not then LocalAgentSender will create one for itself.
"""

def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None):
Expand All @@ -77,11 +73,6 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling
if throttling_port:
self.throttling_http = LocalAgentHTTP(host, throttling_port)

# UDP reporting - this will only get written to after our flush() call.
# We are buffering things up because we are a TBufferedTransport.
udp = TUDPTransport(host, reporting_port)
TBufferedTransport.__init__(self, udp)

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
Expand All @@ -92,14 +83,33 @@ def _create_new_thread_loop(self):
self._thread_loop.start()
return self._thread_loop._io_loop

def readFrame(self):
"""Empty read frame that is never ready"""
return Future()

# Pass-through for HTTP sampling strategies request.
def request_sampling_strategy(self, *args, **kwargs):
return self.local_agent_http.request_sampling_strategy(*args, **kwargs)

# Pass-through for HTTP throttling credit request.
def request_throttling_credits(self, *args, **kwargs):
return self.throttling_http.request_throttling_credits(*args, **kwargs)


class LocalAgentSender(LocalAgentReader, TBufferedTransport):
"""
LocalAgentSender implements everything necessary to report spans to
the local jaeger-agent.
NOTE: LocalAgentSender derives from TBufferedTransport. This will buffer
up all written data until flush() is called. Flush gets called at the
end of the batch span submission call.
"""

def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None):
LocalAgentReader.__init__(self, host, sampling_port, reporting_port,
io_loop, throttling_port)
# UDP reporting - this will only get written to after our flush() call.
# We are buffering things up because we are a TBufferedTransport.
udp = TUDPTransport(host, reporting_port)
TBufferedTransport.__init__(self, udp)

def readFrame(self):
"""Empty read frame that is never ready"""
return Future()
Loading

0 comments on commit cdbd4e1

Please sign in to comment.