Skip to content
This repository has been archived by the owner on Apr 2, 2020. It is now read-only.

Commit

Permalink
Merge pull request #18 from booster-proj/issue/15
Browse files Browse the repository at this point in the history
Issue/15
  • Loading branch information
dmorn authored Dec 22, 2018
2 parents 5dbb1a3 + 0414041 commit 1a6d9d4
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 124 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ dist/
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
*.json
data/

# Misc
.DS_Store
*.swp
_test/
.prometheus.yml
8 changes: 7 additions & 1 deletion cmd/booster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/booster-proj/booster"
"github.com/booster-proj/booster/core"
"github.com/booster-proj/booster/metrics"
"github.com/booster-proj/booster/remote"
"github.com/booster-proj/booster/source"
"github.com/booster-proj/booster/store"
Expand Down Expand Up @@ -89,13 +90,18 @@ func main() {
}

b := new(core.Balancer)
mb := new(metrics.Broker)
rs := store.New(b)
l := source.NewListener(rs)
l := source.NewListener(source.Config{
Store: rs,
MetricsBroker: mb,
})
d := booster.New(b)

router := remote.NewRouter()
router.Config = config
router.Store = rs
router.MetricsProvider = mb
router.SetupRoutes()
r := remote.New(router)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/gorilla/mux v1.6.2
github.com/grandcat/zeroconf v0.0.0-20180329153754-df75bb3ccae1
github.com/miekg/dns v1.1.1 // indirect
github.com/prometheus/client_golang v0.9.2
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 // indirect
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/sys v0.0.0-20181026064943-731415f00dce
Expand Down
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/booster-proj/proxy v0.1.3 h1:DrXIF0u8A0I+2KO3FTpzMeN646i1/fVvvP8OiMVHLKU=
github.com/booster-proj/proxy v0.1.3/go.mod h1:le5Yiwdxl9hONszuGsdz3dJtafDZs938B2hwn7DctAc=
github.com/cenkalti/backoff v2.1.0+incompatible h1:FIRvWBZrzS4YC7NT5cOuZjexzFvIr+Dbi6aD1cZaNBk=
github.com/cenkalti/backoff v2.1.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/grandcat/zeroconf v0.0.0-20180329153754-df75bb3ccae1 h1:VSELJSxQlpi1bz4ZwT+93hPpzNLRcgytLr77iVRJpcE=
github.com/grandcat/zeroconf v0.0.0-20180329153754-df75bb3ccae1/go.mod h1:YjKB0WsLXlMkO9p+wGTCoPIDGRJH0mz7E526PxkQVxI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.1.1 h1:DVkblRdiScEnEr0LR9nTnEQqHYycjkXW9bOjd+2EL2o=
github.com/miekg/dns v1.1.1/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a h1:8fCF9zjAir2SP3N+axz9xs+0r4V8dqPzqsWO10t8zoo=
golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
Expand Down
73 changes: 73 additions & 0 deletions metrics/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright (C) 2018 KIM KeepInMind GmbH/srl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

// Package metrics provides what in prometheus terms is called a
// metrics exporter.
package metrics

import (
"net/http"

"github.com/booster-proj/booster/source"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Broker can be used to both capture and serve metrics.
type Broker struct {
}

// ServeHTTP is just a wrapper around the ServeHTTP function
// of the prohttp default Handler.
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
promhttp.Handler().ServeHTTP(w, r)
}

const namespace = "booster"

var (
sendBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "network_send_bytes",
Help: "Sent bytes for network source",
}, []string{"source", "target"})

receiveBytes = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "network_receive_bytes",
Help: "Received bytes for network source",
}, []string{"source", "target"})
)

func init() {
prometheus.MustRegister(sendBytes)
prometheus.MustRegister(receiveBytes)
}

// SendDataFlow can be used to update the metrics exported by the broker
// about network usage, in particular upload and download bandwidth. `data`
// Type should either be "read" or "write", referring respectively to download
// and upload operations.
func (b *Broker) SendDataFlow(labels map[string]string, data *source.DataFlow) {
switch data.Type {
case "read":
receiveBytes.With(prometheus.Labels(labels)).Add(float64(data.N))
case "write":
sendBytes.With(prometheus.Labels(labels)).Add(float64(data.N))
default:
}
}
18 changes: 12 additions & 6 deletions remote/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
type Router struct {
r *mux.Router

Config booster.Config
Store *store.SourceStore
Config booster.Config
Store *store.SourceStore
MetricsProvider http.Handler
}

func NewRouter() *Router {
Expand All @@ -42,10 +43,15 @@ func NewRouter() *Router {
// properly.
func (r *Router) SetupRoutes() {
router := r.r
router.HandleFunc("/health", makeHealthCheckHandler(r.Config))
router.HandleFunc("/sources", makeSourcesHandler(r.Store))
router.HandleFunc("/sources/{name}/block", makeBlockHandler(r.Store)).Methods("POST", "DELETE")
router.HandleFunc("/policies", makePoliciesHandler(r.Store))
router.HandleFunc("/health.json", makeHealthCheckHandler(r.Config))
if store := r.Store; store != nil {
router.HandleFunc("/sources.json", makeSourcesHandler(store))
router.HandleFunc("/sources/{name}/block.json", makeBlockHandler(store)).Methods("POST", "DELETE")
router.HandleFunc("/policies.json", makePoliciesHandler(store))
}
if handler := r.MetricsProvider; handler != nil {
router.Handle("/metrics", handler)
}
router.Use(loggingMiddleware)
}

Expand Down
101 changes: 77 additions & 24 deletions source/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,104 @@ along with this program. If not, see <https://www.gnu.org/licenses/>.
package source

import (
"fmt"
"math/rand"
"net"
"time"
)

// conn is a wrapper around net.Conn, with the addition of some functions
// DataFlow collects data about a data tranmission.
type DataFlow struct {
Type string
StartedAt time.Time // Start of the first data transmitted.
EndedAt time.Time // Time of the last byte read/written. May be overridden multiple times.
N int // Number of bytes transmitted.
Avg float64 // Avg bytes/seconds.
}

// Begin sets the data flow start value.
func (f *DataFlow) Start() {
f.StartedAt = time.Now()
}

// End computes the Avg, N, and End valus considering start as
// beginning of this data transmission.
func (f *DataFlow) Stop(n int) {
end := time.Now()
d := end.Sub(f.StartedAt)

f.N += n
f.EndedAt = end

avg := float64(n) / d.Seconds() // avg transmission speed of this connection.
f.Avg = avg
}

// Conn is a wrapper around net.Conn, with the addition of some functions
// useful to uniquely identify the connection and receive callbacks on
// close events.
type conn struct {
type Conn struct {
net.Conn

id string
ref string // Reference identifier, usually the parent's source identifier.
closed bool
onClose func(string) // Callback for close event.
closed bool // tells wether the connection was closed.
OnClose func() // Callback for close event.
OnRead func(df *DataFlow)
OnWrite func(df *DataFlow)
}

func newConn(c net.Conn, ref string) *conn {
r := rand.Int()
uuid := fmt.Sprintf("%s-%d", ref, r)
if c != nil {
uuid = fmt.Sprintf("%v-%v@%d-%d", c.LocalAddr(), c.RemoteAddr(), time.Now().UnixNano(), r)
}
// Read is the io.Reader implementation of Conn. It forwards the request
// to the underlying net.Conn, but it also records the number of bytes
// tranferred and the duration of the transmission. It then exposes the
// data using the OnRead callback.
func (c *Conn) Read(p []byte) (int, error) {
dl := &DataFlow{Type: "read"}
dl.Start()
n, err := c.Conn.Read(p) // Transmit the data.

return &conn{
Conn: c,
id: uuid,
ref: ref,
}
go func() {
if n == 0 {
return
}
dl.Stop(n)
if f := c.OnRead; f != nil {
f(dl)
}
}()

return n, err
}

func (c *conn) uuid() string {
return c.id
// Write is the io.Writer implementation of Conn. It forwards the request
// to the underlying net.Conn, but it also records the number of bytes
// tranferred and the duration of the transmission. It then exposes the
// data using the OnWrite callback.
func (c *Conn) Write(p []byte) (int, error) {
upl := &DataFlow{Type: "write"}
upl.Start()
n, err := c.Conn.Write(p) // Transmit the data.

go func() {
if n == 0 {
return
}
upl.Stop(n)
if f := c.OnWrite; f != nil {
f(upl)
}
}()

return n, err
}

func (c *conn) Close() error {
// Close closes the underlying net.Conn, calling the OnClose callback
// afterwards.
func (c *Conn) Close() error {
if c.closed {
// Multiple parts of the code might try to close the connection. Better be sure
// that the underlying connection gets closed at some point, leave that code and
// avoid repetitions here.
return nil
}
if f := c.onClose; f != nil {
defer f(c.uuid())
if f := c.OnClose; f != nil {
defer f()
}

c.closed = true
Expand Down
Loading

0 comments on commit 1a6d9d4

Please sign in to comment.