Skip to content

Commit

Permalink
Merge pull request #4 from bonitoo-io/feature/query_benchmarker_teard…
Browse files Browse the repository at this point in the history
…own_sync

Feature/query benchmarker teardown sync
  • Loading branch information
alespour authored Aug 8, 2019
2 parents a96a43b + 561af60 commit 7233853
Showing 1 changed file with 72 additions and 20 deletions.
92 changes: 72 additions & 20 deletions cmd/query_benchmarker_influxdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
package main

import (
"context"
"encoding/gob"
"flag"
"fmt"
"github.com/influxdata/influxdb-comparisons/bulk_load"
"io"
"log"
"math/rand"
"net"
"net/http"
"net/rpc"
"os"
"runtime/pprof"
Expand Down Expand Up @@ -56,6 +59,7 @@ var (
gradualWorkersMax int
increaseInterval time.Duration
notificationHostPort string
notificationListenPort int
dialTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
Expand Down Expand Up @@ -86,6 +90,9 @@ var (
batchSize int
movingAverageStat *TimedStatGroup
isBurnIn bool
scanClose chan int
notificationServer *http.Server
externalFinished bool
)

type statsMap map[string]*StatGroup
Expand Down Expand Up @@ -118,7 +125,8 @@ func init() {
flag.DurationVar(&increaseInterval, "increase-interval", time.Second*30, "Interval when number of workers will increase")
flag.DurationVar(&testDuration, "benchmark-duration", time.Second*0, "Run querying continually for defined time interval, instead of stopping after all queries have been used")
flag.DurationVar(&responseTimeLimit, "response-time-limit", time.Second*0, "Query response time limit, after which will client stop.")
flag.StringVar(&notificationHostPort, "notification-target", "", "host:port of finish message notification receiver")
flag.StringVar(&notificationHostPort, "notification-target", "", "host:port of finish message notification receiver(s)")
flag.IntVar(&notificationListenPort, "notification-port", -1, "Listen port for remote notification messages. Used to remotely finish benchmark. -1 to disable feature")
flag.DurationVar(&dialTimeout, "dial-timeout", time.Second*15, "TCP dial timeout.")
flag.DurationVar(&readTimeout, "write-timeout", time.Second*300, "TCP write timeout.")
flag.DurationVar(&writeTimeout, "read-timeout", time.Second*300, "TCP read timeout.")
Expand Down Expand Up @@ -196,6 +204,20 @@ func init() {
if trendSamples <= 0 {
trendSamples = int(increaseInterval.Seconds())
}

if notificationListenPort > 0 { // copied from bulk_load_influx/main.go
notif := new(bulk_load.NotifyReceiver)
rpc.Register(notif)
rpc.HandleHTTP()
bulk_load.RegisterHandler(notifyHandler)
l, e := net.Listen("tcp", fmt.Sprintf(":%d", notificationListenPort))
if e != nil {
log.Fatal("listen error:", e)
}
log.Println("Listening for incoming notification")
notificationServer = &http.Server{}
go notificationServer.Serve(l)
}
}

func main() {
Expand Down Expand Up @@ -284,7 +306,7 @@ func main() {
wallStart := time.Now()

scanRes := make(chan int)
scanClose := make(chan int)
scanClose = make(chan int)
responseTimeLimitReached := false
timeoutReached := false
timeLimit := testDuration.Nanoseconds() > 0
Expand Down Expand Up @@ -337,7 +359,7 @@ loop:
}
}
case <-responseTicker.C:
if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*3 < int64(movingAverageStat.Avg()*1e6) {
if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*2 < int64(movingAverageStat.Avg()*1e6) {
responseTimeLimitReached = true
scanClose <- 1
respLimitms := float64(responseTimeLimit.Nanoseconds()) / 1e6
Expand All @@ -360,11 +382,7 @@ loop:
if timeLimit && tickerQuaters > 3 && !timeoutReached {
timeoutReached = true
log.Println("Time out reached")
if !scanFinished {
scanClose <- 1
} else {
log.Println("Scan already finished")
}
terminate()
if responseTimeLimit > 0 {
//still try to find response time limit
respLimitms := float64(responseTimeLimit.Nanoseconds()) / 1e6
Expand Down Expand Up @@ -431,16 +449,26 @@ waitLoop:
fmt.Println("done shutting down telemetry.")
}

if notificationHostPort != "" {
client, err := rpc.DialHTTP("tcp", notificationHostPort)
if err != nil {
log.Println("error: dialing:", err)
} else {
var res int
input := 0
call := client.Go("NotifyReceiver.Notify", input, &res, nil)
if call.Error != nil {
log.Println("error: calling:", call.Error)
if notificationServer != nil {
fmt.Println("shutting down notification listener...")
notificationServer.Shutdown(context.Background())
fmt.Println("done shutting down notification listener.")
}

if notificationHostPort != "" && !externalFinished {
targets := strings.Split(notificationHostPort, ",")
for _, target := range targets {
client, err := rpc.DialHTTP("tcp", target)
if err != nil {
log.Printf("error dialing %s: %v", target, err)
} else {
var res int
input := 0
call := client.Go("NotifyReceiver.Notify", input, &res, nil)
if call.Error != nil {
log.Printf("error calling %s: %v", target, call.Error)
}
client.Close()
}
}
}
Expand Down Expand Up @@ -550,7 +578,7 @@ loop:
qind++
select {
case <-closeChan:
fmt.Printf("Received finish request\n")
log.Println("Received finish request")
break loop
default:
}
Expand Down Expand Up @@ -742,3 +770,27 @@ func fprintStats(w io.Writer, statGroups statsMap) {
}
}
}

func terminate() {
if !scanFinished {
scanClose <- 1
} else {
log.Println("Scan already finished")
}
}

func notifyHandler(arg int) (int, error) {
var e error
if arg == 0 {
log.Println("Received external finish request")
if !externalFinished {
externalFinished = true
terminate()
} else {
log.Println("External finish request already received")
}
} else {
e = fmt.Errorf("unknown notification code: %d", arg)
}
return 0, e
}

0 comments on commit 7233853

Please sign in to comment.