Skip to content

Commit

Permalink
Merge pull request #69 from wavefrontHQ/specify-traces-port
Browse files Browse the repository at this point in the history
Support sending spans and metrics to different ports using the WavefrontSender
  • Loading branch information
oppegard authored Aug 10, 2021
2 parents 2f5be62 + e5a121d commit c06327c
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 76 deletions.
2 changes: 1 addition & 1 deletion internal/lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type LineHandler struct {
done chan struct{}
}

var throttledSleepDuration = time.Duration(time.Second * 30)
var throttledSleepDuration = time.Second * 30
var errThrottled = errors.New("error: throttled event creation")

type LineHandlerOption func(*LineHandler)
Expand Down
3 changes: 2 additions & 1 deletion internal/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type reporter struct {
client *http.Client
}

// Newreporter create a metrics Reporter
// NewReporter create a metrics Reporter
func NewReporter(server string, token string) Reporter {
return &reporter{
serverURL: server,
Expand All @@ -26,6 +26,7 @@ func NewReporter(server string, token string) Reporter {
}
}

// Report creates and sends a POST to the reportEndpoint with the given pointLines
func (reporter reporter) Report(format string, pointLines string) (*http.Response, error) {
if format == "" || pointLines == "" {
return nil, formatError
Expand Down
55 changes: 0 additions & 55 deletions senders/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package senders

import (
"fmt"
"os"
"strconv"
"time"

"github.com/wavefronthq/wavefront-sdk-go/event"
Expand Down Expand Up @@ -54,59 +52,6 @@ type wavefrontSender struct {
proxy bool
}

// newWavefrontClient creates and returns a Wavefront Client instance
func newWavefrontClient(cfg *configuration) (Sender, error) {
if cfg.BatchSize == 0 {
cfg.BatchSize = defaultBatchSize
}
if cfg.MaxBufferSize == 0 {
cfg.MaxBufferSize = defaultBufferSize
}
if cfg.FlushIntervalSeconds == 0 {
cfg.FlushIntervalSeconds = defaultFlushInterval
}

reporter := internal.NewReporter(cfg.Server, cfg.Token)

sender := &wavefrontSender{
defaultSource: internal.GetHostname("wavefront_direct_sender"),
proxy: len(cfg.Token) == 0,
}
sender.internalRegistry = internal.NewMetricRegistry(
sender,
internal.SetPrefix("~sdk.go.core.sender.direct"),
internal.SetTag("pid", strconv.Itoa(os.Getpid())),
)
sender.pointHandler = newLineHandler(reporter, cfg, internal.MetricFormat, "points", sender.internalRegistry)
sender.histoHandler = newLineHandler(reporter, cfg, internal.HistogramFormat, "histograms", sender.internalRegistry)
sender.spanHandler = newLineHandler(reporter, cfg, internal.TraceFormat, "spans", sender.internalRegistry)
sender.spanLogHandler = newLineHandler(reporter, cfg, internal.SpanLogsFormat, "span_logs", sender.internalRegistry)
sender.eventHandler = newLineHandler(reporter, cfg, internal.EventFormat, "events", sender.internalRegistry)

sender.pointsValid = sender.internalRegistry.NewDeltaCounter("points.valid")
sender.pointsInvalid = sender.internalRegistry.NewDeltaCounter("points.invalid")
sender.pointsDropped = sender.internalRegistry.NewDeltaCounter("points.dropped")

sender.histogramsValid = sender.internalRegistry.NewDeltaCounter("histograms.valid")
sender.histogramsInvalid = sender.internalRegistry.NewDeltaCounter("histograms.invalid")
sender.histogramsDropped = sender.internalRegistry.NewDeltaCounter("histograms.dropped")

sender.spansValid = sender.internalRegistry.NewDeltaCounter("spans.valid")
sender.spansInvalid = sender.internalRegistry.NewDeltaCounter("spans.invalid")
sender.spansDropped = sender.internalRegistry.NewDeltaCounter("spans.dropped")

sender.spanLogsValid = sender.internalRegistry.NewDeltaCounter("span_logs.valid")
sender.spanLogsInvalid = sender.internalRegistry.NewDeltaCounter("span_logs.invalid")
sender.spanLogsDropped = sender.internalRegistry.NewDeltaCounter("span_logs.dropped")

sender.eventsValid = sender.internalRegistry.NewDeltaCounter("events.valid")
sender.eventsInvalid = sender.internalRegistry.NewDeltaCounter("events.invalid")
sender.eventsDropped = sender.internalRegistry.NewDeltaCounter("events.dropped")

sender.Start()
return sender, nil
}

func newLineHandler(reporter internal.Reporter, cfg *configuration, format, prefix string, registry *internal.MetricRegistry) *internal.LineHandler {
flushInterval := time.Second * time.Duration(cfg.FlushIntervalSeconds)

Expand Down
108 changes: 102 additions & 6 deletions senders/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ package senders
import (
"fmt"
"net/url"
"os"
"strconv"
"strings"

"github.com/wavefronthq/wavefront-sdk-go/internal"
)

const (
defaultTracesPort = 30001
defaultMetricsPort = 2878
)

// Option Wavefront client configuration options
Expand All @@ -17,6 +26,9 @@ type configuration struct {
// Optional configuration properties. Default values should suffice for most use cases.
// override the defaults only if you wish to set higher values.

MetricsPort int
TracesPort int

// max batch of data sent per flush interval. defaults to 10,000. recommended not to exceed 40,000.
BatchSize int

Expand All @@ -34,47 +46,131 @@ type configuration struct {

// NewSender creates Wavefront client
func NewSender(wfURL string, setters ...Option) (Sender, error) {
cfg := &configuration{}
cfg, err := CreateConfig(wfURL, setters...)
if err != nil {
return nil, fmt.Errorf("unable to create sender config: %s", err)
}
return newWavefrontClient(cfg)
}

func CreateConfig(wfURL string, setters ...Option) (*configuration, error) {
cfg := &configuration{
MetricsPort: defaultMetricsPort,
TracesPort: defaultTracesPort,
BatchSize: defaultBatchSize,
MaxBufferSize: defaultBufferSize,
FlushIntervalSeconds: defaultFlushInterval,
}

u, err := url.Parse(wfURL)
if err != nil {
return nil, err
}

if !strings.HasPrefix(strings.ToLower(u.Scheme), "http") {
return nil, fmt.Errorf("invalid schema '%s', only 'http' is supported", u.Scheme)
return nil, fmt.Errorf("invalid scheme '%s' in '%s', only 'http' is supported", u.Scheme, u)
}

if len(u.User.String()) > 0 {
cfg.Token = u.User.String()
u.User = nil
}

if u.Port() != "" {
port, err := strconv.Atoi(u.Port())
if err != nil {
return nil, fmt.Errorf("unable to convert port to integer: %s", err)
}
cfg.MetricsPort = port
cfg.TracesPort = port
u.Host = u.Hostname()
}
cfg.Server = u.String()

for _, set := range setters {
set(cfg)
}
return newWavefrontClient(cfg)
return cfg, nil
}

// newWavefrontClient creates a Wavefront sender
func newWavefrontClient(cfg *configuration) (Sender, error) {
metricsReporter := internal.NewReporter(fmt.Sprintf("%s:%d", cfg.Server, cfg.MetricsPort), cfg.Token)
tracesReporter := internal.NewReporter(fmt.Sprintf("%s:%d", cfg.Server, cfg.TracesPort), cfg.Token)

sender := &wavefrontSender{
defaultSource: internal.GetHostname("wavefront_direct_sender"),
proxy: len(cfg.Token) == 0,
}
sender.initializeInternalMetrics()
sender.pointHandler = newLineHandler(metricsReporter, cfg, internal.MetricFormat, "points", sender.internalRegistry)
sender.histoHandler = newLineHandler(metricsReporter, cfg, internal.HistogramFormat, "histograms", sender.internalRegistry)
sender.spanHandler = newLineHandler(tracesReporter, cfg, internal.TraceFormat, "spans", sender.internalRegistry)
sender.spanLogHandler = newLineHandler(tracesReporter, cfg, internal.SpanLogsFormat, "span_logs", sender.internalRegistry)
sender.eventHandler = newLineHandler(metricsReporter, cfg, internal.EventFormat, "events", sender.internalRegistry)

sender.Start()
return sender, nil
}

// BatchSize set max batch of data sent per flush interval. defaults to 10,000. recommended not to exceed 40,000.
func (sender *wavefrontSender) initializeInternalMetrics() {
sender.internalRegistry = internal.NewMetricRegistry(
sender,
internal.SetPrefix("~sdk.go.core.sender.direct"),
internal.SetTag("pid", strconv.Itoa(os.Getpid())),
)
sender.pointsValid = sender.internalRegistry.NewDeltaCounter("points.valid")
sender.pointsInvalid = sender.internalRegistry.NewDeltaCounter("points.invalid")
sender.pointsDropped = sender.internalRegistry.NewDeltaCounter("points.dropped")

sender.histogramsValid = sender.internalRegistry.NewDeltaCounter("histograms.valid")
sender.histogramsInvalid = sender.internalRegistry.NewDeltaCounter("histograms.invalid")
sender.histogramsDropped = sender.internalRegistry.NewDeltaCounter("histograms.dropped")

sender.spansValid = sender.internalRegistry.NewDeltaCounter("spans.valid")
sender.spansInvalid = sender.internalRegistry.NewDeltaCounter("spans.invalid")
sender.spansDropped = sender.internalRegistry.NewDeltaCounter("spans.dropped")

sender.spanLogsValid = sender.internalRegistry.NewDeltaCounter("span_logs.valid")
sender.spanLogsInvalid = sender.internalRegistry.NewDeltaCounter("span_logs.invalid")
sender.spanLogsDropped = sender.internalRegistry.NewDeltaCounter("span_logs.dropped")

sender.eventsValid = sender.internalRegistry.NewDeltaCounter("events.valid")
sender.eventsInvalid = sender.internalRegistry.NewDeltaCounter("events.invalid")
sender.eventsDropped = sender.internalRegistry.NewDeltaCounter("events.dropped")
}

// BatchSize set max batch of data sent per flush interval. Defaults to 10,000. recommended not to exceed 40,000.
func BatchSize(n int) Option {
return func(cfg *configuration) {
cfg.BatchSize = n
}
}

// MaxBufferSize set the size of internal buffers beyond which received data is dropped.
// MaxBufferSize set the size of internal buffers beyond which received data is dropped. Defaults to 50,000.
func MaxBufferSize(n int) Option {
return func(cfg *configuration) {
cfg.MaxBufferSize = n
}
}

// FlushIntervalSeconds set the interval (in seconds) at which to flush data to Wavefront. defaults to 1 Second.
// FlushIntervalSeconds set the interval (in seconds) at which to flush data to Wavefront. Defaults to 1 Second.
func FlushIntervalSeconds(n int) Option {
return func(cfg *configuration) {
cfg.FlushIntervalSeconds = n
}
}

// MetricsPort sets the port on which to report metrics. Default is 2878.
func MetricsPort(port int) Option {
return func(cfg *configuration) {
cfg.MetricsPort = port
}
}

// TracesPort sets the port on which to report traces. Default is 30001.
func TracesPort(port int) Option {
return func(cfg *configuration) {
cfg.TracesPort = port
}
}
86 changes: 86 additions & 0 deletions senders/client_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package senders_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/wavefronthq/wavefront-sdk-go/senders"
)

func TestInvalidURL(t *testing.T) {
_, err := senders.CreateConfig("%%%%")
assert.Error(t, err)
}

func TestScheme(t *testing.T) {
_, err := senders.CreateConfig("http://localhost")
require.NoError(t, err)
_, err = senders.CreateConfig("https://localhost")
require.NoError(t, err)

_, err = senders.CreateConfig("gopher://localhost")
require.Error(t, err)
}

func TestPortExtractedFromURL(t *testing.T) {
cfg, err := senders.CreateConfig("http://localhost:1234")
require.NoError(t, err)
assert.Equal(t, 1234, cfg.MetricsPort)
assert.Equal(t, 1234, cfg.TracesPort)
}

func TestToken(t *testing.T) {
cfg, err := senders.CreateConfig("https://my-api-token@localhost")
require.NoError(t, err)

assert.Equal(t, "my-api-token", cfg.Token)
assert.Equal(t, "https://localhost", cfg.Server)
}

func TestDefaults(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost")
require.NoError(t, err)

assert.Equal(t, 10000, cfg.BatchSize)
assert.Equal(t, 1, cfg.FlushIntervalSeconds)
assert.Equal(t, 50000, cfg.MaxBufferSize)
assert.Equal(t, 2878, cfg.MetricsPort)
assert.Equal(t, 30001, cfg.TracesPort)
}

func TestBatchSize(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.BatchSize(123))
require.NoError(t, err)

assert.Equal(t, 123, cfg.BatchSize)
}

func TestFlushIntervalSeconds(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.FlushIntervalSeconds(123))
require.NoError(t, err)

assert.Equal(t, 123, cfg.FlushIntervalSeconds)
}

func TestMaxBufferSize(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.MaxBufferSize(123))
require.NoError(t, err)

assert.Equal(t, 123, cfg.MaxBufferSize)
}

func TestMetricsPort(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.MetricsPort(123))
require.NoError(t, err)

assert.Equal(t, 123, cfg.MetricsPort)
}

func TestTracesPort(t *testing.T) {
cfg, err := senders.CreateConfig("https://localhost", senders.TracesPort(123))
require.NoError(t, err)

assert.Equal(t, 123, cfg.TracesPort)
}
19 changes: 6 additions & 13 deletions senders/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/wavefronthq/wavefront-sdk-go/histogram"
"github.com/wavefronthq/wavefront-sdk-go/senders"
)
Expand Down Expand Up @@ -50,25 +52,16 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

func TestInvalidURL(t *testing.T) {
_, err := senders.NewSender("tut:ut:u")
assert.NotNil(t, err)
}

func TestSendDirect(t *testing.T) {
wf, err := senders.NewSender("http://" + token + "@localhost:" + wfPort)
assert.Nil(t, err)
if wf != nil {
doTest(t, wf)
}
require.NoError(t, err)
doTest(t, wf)
}

func TestSendProxy(t *testing.T) {
wf, err := senders.NewSender("http://localhost:" + proxyPort)
assert.Nil(t, err)
if wf != nil {
doTest(t, wf)
}
require.NoError(t, err)
doTest(t, wf)
}

func doTest(t *testing.T, wf senders.Sender) {
Expand Down

0 comments on commit c06327c

Please sign in to comment.