From 3f0c62491ef7f630aee4f6b44ee577694f21ca8f Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 8 Aug 2019 09:58:15 +0200 Subject: [PATCH 1/3] end test sooner when RT >= 2*limit --- cmd/query_benchmarker_influxdb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 810a1442..690ef7a2 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -328,7 +328,7 @@ loop: } } case <-responseTicker.C: - if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*3 < int64(movingAverageStat.Avg()*1e6) { + if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*2 < int64(movingAverageStat.Avg()*1e6) { responseTimeLimitReached = true scanClose <- 1 respLimitms := float64(responseTimeLimit.Nanoseconds()) / 1e6 From b91ad78a41945ef7bc1eec8f32a0fc8cf0effdaf Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 8 Aug 2019 11:50:29 +0200 Subject: [PATCH 2/3] log finish request with timestamp --- cmd/query_benchmarker_influxdb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 690ef7a2..6160e5b5 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -541,7 +541,7 @@ loop: qind++ select { case <-closeChan: - fmt.Printf("Received finish request\n") + log.Print("Received finish request") break loop default: } From f1a805a90e8e4d08433edf4e951d83e51d51d3a5 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 8 Aug 2019 16:08:43 +0200 Subject: [PATCH 3/3] added support for query benchmarker notifications --- cmd/query_benchmarker_influxdb/main.go | 89 ++++++++++++++++++++------ 1 file changed, 71 insertions(+), 18 deletions(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 6160e5b5..4cc5e10a 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -8,11 +8,15 @@ package main import ( + "context" "encoding/gob" "flag" "fmt" + "github.com/influxdata/influxdb-comparisons/bulk_load" "io" "log" + "net" + "net/http" "net/rpc" "os" "runtime/pprof" @@ -55,6 +59,7 @@ var ( gradualWorkersMax int increaseInterval time.Duration notificationHostPort string + notificationListenPort int dialTimeout time.Duration readTimeout time.Duration writeTimeout time.Duration @@ -85,6 +90,9 @@ var ( batchSize int movingAverageStat *TimedStatGroup isBurnIn bool + scanClose chan int + notificationServer *http.Server + externalFinished bool ) type statsMap map[string]*StatGroup @@ -117,7 +125,8 @@ func init() { flag.DurationVar(&increaseInterval, "increase-interval", time.Second*30, "Interval when number of workers will increase") flag.DurationVar(&testDuration, "benchmark-duration", time.Second*0, "Run querying continually for defined time interval, instead of stopping after all queries have been used") flag.DurationVar(&responseTimeLimit, "response-time-limit", time.Second*0, "Query response time limit, after which will client stop.") - flag.StringVar(¬ificationHostPort, "notification-target", "", "host:port of finish message notification receiver") + flag.StringVar(¬ificationHostPort, "notification-target", "", "host:port of finish message notification receiver(s)") + flag.IntVar(¬ificationListenPort, "notification-port", -1, "Listen port for remote notification messages. Used to remotely finish benchmark. -1 to disable feature") flag.DurationVar(&dialTimeout, "dial-timeout", time.Second*15, "TCP dial timeout.") flag.DurationVar(&readTimeout, "write-timeout", time.Second*300, "TCP write timeout.") flag.DurationVar(&writeTimeout, "read-timeout", time.Second*300, "TCP read timeout.") @@ -195,6 +204,20 @@ func init() { if trendSamples <= 0 { trendSamples = int(increaseInterval.Seconds()) } + + if notificationListenPort > 0 { // copied from bulk_load_influx/main.go + notif := new(bulk_load.NotifyReceiver) + rpc.Register(notif) + rpc.HandleHTTP() + bulk_load.RegisterHandler(notifyHandler) + l, e := net.Listen("tcp", fmt.Sprintf(":%d", notificationListenPort)) + if e != nil { + log.Fatal("listen error:", e) + } + log.Println("Listening for incoming notification") + notificationServer = &http.Server{} + go notificationServer.Serve(l) + } } func main() { @@ -275,7 +298,7 @@ func main() { wallStart := time.Now() scanRes := make(chan int) - scanClose := make(chan int) + scanClose = make(chan int) responseTimeLimitReached := false timeoutReached := false timeLimit := testDuration.Nanoseconds() > 0 @@ -351,11 +374,7 @@ loop: if timeLimit && tickerQuaters > 3 && !timeoutReached { timeoutReached = true log.Println("Time out reached") - if !scanFinished { - scanClose <- 1 - } else { - log.Println("Scan already finished") - } + terminate() if responseTimeLimit > 0 { //still try to find response time limit respLimitms := float64(responseTimeLimit.Nanoseconds()) / 1e6 @@ -422,16 +441,26 @@ waitLoop: fmt.Println("done shutting down telemetry.") } - if notificationHostPort != "" { - client, err := rpc.DialHTTP("tcp", notificationHostPort) - if err != nil { - log.Println("error: dialing:", err) - } else { - var res int - input := 0 - call := client.Go("NotifyReceiver.Notify", input, &res, nil) - if call.Error != nil { - log.Println("error: calling:", call.Error) + if notificationServer != nil { + fmt.Println("shutting down notification listener...") + notificationServer.Shutdown(context.Background()) + fmt.Println("done shutting down notification listener.") + } + + if notificationHostPort != "" && !externalFinished { + targets := strings.Split(notificationHostPort, ",") + for _, target := range targets { + client, err := rpc.DialHTTP("tcp", target) + if err != nil { + log.Printf("error dialing %s: %v", target, err) + } else { + var res int + input := 0 + call := client.Go("NotifyReceiver.Notify", input, &res, nil) + if call.Error != nil { + log.Printf("error calling %s: %v", target, call.Error) + } + client.Close() } } } @@ -541,7 +570,7 @@ loop: qind++ select { case <-closeChan: - log.Print("Received finish request") + log.Println("Received finish request") break loop default: } @@ -730,3 +759,27 @@ func fprintStats(w io.Writer, statGroups statsMap) { } } } + +func terminate() { + if !scanFinished { + scanClose <- 1 + } else { + log.Println("Scan already finished") + } +} + +func notifyHandler(arg int) (int, error) { + var e error + if arg == 0 { + log.Println("Received external finish request") + if !externalFinished { + externalFinished = true + terminate() + } else { + log.Println("External finish request already received") + } + } else { + e = fmt.Errorf("unknown notification code: %d", arg) + } + return 0, e +}