From 802311614de9fa5c771143840bef4ad13295694d Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 Mar 2019 15:24:07 +0100 Subject: [PATCH 01/29] partial support for toml config --- bulk_data_gen/common/config.go | 126 ++++++++++++++++++++++++++++++++ bulk_data_gen/dashboard/host.go | 13 ++++ cmd/bulk_data_gen/main.go | 15 +++- 3 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 bulk_data_gen/common/config.go diff --git a/bulk_data_gen/common/config.go b/bulk_data_gen/common/config.go new file mode 100644 index 00000000..acef838c --- /dev/null +++ b/bulk_data_gen/common/config.go @@ -0,0 +1,126 @@ +package common + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/pelletier/go-toml" + "log" + "math/rand" + "reflect" +) + +type Source interface{} + +func GetSourceValue(s *Source, measurementName, itemKey string) interface{} { + switch reflect.ValueOf(s).Kind() { + case reflect.Array: + array := (*s).([]interface{}) + return array[rand.Int63n(int64(len(array)))] + case reflect.Map: + log.Fatalf("generators are not supported (yet) ['%s/%s']", measurementName, itemKey) + default: + return *s + } + panic("unreachable") +} + +type Tag struct { + Name string + Source Source +} + +type Field struct { + Count int + Name string + Source Source +} + +type Measurement struct { + Name string + Sample float32 + Tags []Tag + Fields []Field +} + +type ExternalConfig struct { + measurements []Measurement +} + +var Config *ExternalConfig + +func (c *ExternalConfig) String() string { + var buf bytes.Buffer + for _,m := range c.measurements { + buf.WriteString(fmt.Sprintf("definition: %s, sample: %f\n", m.Name, m.Sample)) + buf.WriteString(fmt.Sprintf(" tags:\n")) + for _,tag := range m.Tags { + buf.WriteString(fmt.Sprintf(" tag: %s\n", tag.Name)) + buf.WriteString(fmt.Sprintf(" source: %v\n", tag.Source)) + } + buf.WriteString(fmt.Sprintf(" fields:\n")) + for _,field := range m.Fields { + buf.WriteString(fmt.Sprintf(" field: %s, count: %d\n", field.Name, field.Count)) + buf.WriteString(fmt.Sprintf(" source: %v\n", field.Source)) + } + } + return buf.String() +} + +func (c *ExternalConfig) GetTagBytesValue(measurementName, tagKey []byte, failIfNotFound bool) []byte { + return []byte(c.GetTagValue(string(measurementName), string(tagKey), failIfNotFound)) +} + +func (c *ExternalConfig) GetTagValue(measurementName, tagKey string, failIfNotFound bool) string { + for _,m := range c.measurements { + if "" == measurementName || m.Name == measurementName { + for _,tag := range m.Tags { + if tag.Name == tagKey { + return fmt.Sprintf("%v", GetSourceValue(&tag.Source, m.Name, tag.Name)) + } + } + } + } + if failIfNotFound { + log.Fatalf("value for tag '%s/%s' not found", measurementName, tagKey) + } + return "" +} + +func (c *ExternalConfig) GetFieldBytesValue(measurementName, tagKey []byte, failIfNotFound bool) interface{} { + return c.GetFieldValue(string(measurementName), string(tagKey), failIfNotFound) +} + +func (c *ExternalConfig) GetFieldValue(measurementName, fieldKey string, failIfNotFound bool) interface{} { + for _,m := range c.measurements { + if "" == measurementName || m.Name == measurementName { + for _,field := range m.Fields { + if field.Name == fieldKey { + return GetSourceValue(&field.Source, m.Name, field.Name) + } + } + } + } + if failIfNotFound { + log.Fatalf("value for field '%s/%s' not found", measurementName, fieldKey) + } + return nil +} + +func NewConfig(path string) (*ExternalConfig, error) { + toml, err := toml.LoadFile(path) + if err != nil { + return nil, fmt.Errorf("file load failed: %v", err) + } + obj := toml.ToMap()["measurements"] + b, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("marshall failed: %v", err) + } + config := ExternalConfig{} + err = json.Unmarshal(b, &config.measurements) + if err != nil { + return nil, fmt.Errorf("unmarshall failed: %v", err) + } + return &config, nil +} diff --git a/bulk_data_gen/dashboard/host.go b/bulk_data_gen/dashboard/host.go index f4bab746..5b94f733 100644 --- a/bulk_data_gen/dashboard/host.go +++ b/bulk_data_gen/dashboard/host.go @@ -92,6 +92,19 @@ func NewHost(i int, offset int, start time.Time) Host { SimulatedMeasurements: sm, } + + // partial override from external config + if Config != nil { + h.Region = Config.GetTagBytesValue(nil, devops.MachineTagKeys[1], true) + h.Datacenter = Config.GetTagBytesValue(nil, devops.MachineTagKeys[2], true) + h.Rack = Config.GetTagBytesValue(nil, devops.MachineTagKeys[3], true) + h.OS = Config.GetTagBytesValue(nil, devops.MachineTagKeys[4], true) + h.Arch = Config.GetTagBytesValue(nil, devops.MachineTagKeys[5], true) + h.Service = Config.GetTagBytesValue(nil, devops.MachineTagKeys[7], true) + h.ServiceVersion = Config.GetTagBytesValue(nil, devops.MachineTagKeys[8], true) + h.ServiceEnvironment = Config.GetTagBytesValue(nil, devops.MachineTagKeys[9], true) + } + currentHostIndex++ return h } diff --git a/cmd/bulk_data_gen/main.go b/cmd/bulk_data_gen/main.go index a8f62392..46126363 100644 --- a/cmd/bulk_data_gen/main.go +++ b/cmd/bulk_data_gen/main.go @@ -37,9 +37,9 @@ var ( daemonUrl string dbName string - format string - useCase string - + format string + useCase string + configFile string scaleVar int64 scaleVarOffset int64 samplingInterval time.Duration @@ -67,6 +67,7 @@ func init() { flag.Int64Var(&scaleVar, "scale-var", 1, "Scaling variable specific to the use case.") flag.Int64Var(&scaleVarOffset, "scale-var-offset", 0, "Scaling variable offset specific to the use case.") flag.DurationVar(&samplingInterval, "sampling-interval", devops.EpochDuration, "Simulated sampling interval.") + flag.StringVar(&configFile, "config-file", "", "Simulator config file in TOML format") flag.StringVar(×tampStartStr, "timestamp-start", common.DefaultDateTimeStart, "Beginning timestamp (RFC3339).") flag.StringVar(×tampEndStr, "timestamp-end", common.DefaultDateTimeEnd, "Ending timestamp (RFC3339).") @@ -136,6 +137,14 @@ func main() { common.Seed(seed) + if configFile != "" { + c, err := common.NewConfig(configFile) + if err != nil { + log.Fatalf("external config error: %v", err) + } + common.Config = c + } + out := bufio.NewWriterSize(os.Stdout, 4<<20) defer out.Flush() From a39bc0f7553f0e818efa7f91765b3337d1de4a78 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 Mar 2019 15:54:35 +0100 Subject: [PATCH 02/29] log info about config file --- cmd/bulk_data_gen/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/bulk_data_gen/main.go b/cmd/bulk_data_gen/main.go index 46126363..c13a1698 100644 --- a/cmd/bulk_data_gen/main.go +++ b/cmd/bulk_data_gen/main.go @@ -143,6 +143,7 @@ func main() { log.Fatalf("external config error: %v", err) } common.Config = c + log.Printf("Using config file %s\n", configFile) } out := bufio.NewWriterSize(os.Stdout, 4<<20) From d3ef3d46c90d134952393a6883a58ae0a7abbb51 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 Mar 2019 16:10:45 +0100 Subject: [PATCH 03/29] added datagen config --- bonitoo.toml | 352 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 bonitoo.toml diff --git a/bonitoo.toml b/bonitoo.toml new file mode 100644 index 00000000..bfd28cd7 --- /dev/null +++ b/bonitoo.toml @@ -0,0 +1,352 @@ +title = "Bonitoo schema" + +[[measurements]] +name = "cpu" +# sample 40% of the tag set, default is to sample 50% +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" } + # specific tags +] +fields = [ + { name = "usage_guest", count = 8640, source = { type = "rand", seed = 10 } }, + { name = "usage_guest_nice", count = 8640, source = { type = "rand", seed = 11 } }, + { name = "usage_idle", count = 8640, source = { type = "rand", seed = 12 } }, + { name = "usage_iowait", count = 8640, source = { type = "rand", seed = 13 } }, + { name = "usage_irq", count = 8640, source = { type = "rand", seed = 14 } }, + { name = "usage_nice", count = 8640, source = { type = "rand", seed = 15 } }, + { name = "usage_softirq", count = 8640, source = { type = "rand", seed = 16 } }, + { name = "usage_steal", count = 8640, source = { type = "rand", seed = 17 } }, + { name = "usage_system", count = 8640, source = { type = "rand", seed = 18 } }, + { name = "usage_user", count = 8640, source = { type = "rand", seed = 19 } }, +] + +[[measurements]] +name = "disk" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "fstype", source = "ext3" }, + { name = "path", source = "/dev/sda1" } +] +fields = [ + { name = "free", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "inodes_free", count = 8640, source = [3,5,5,6,7,9,1,2,9,6] }, + { name = "inodes_total", count = 8640, source = [1,8,5,3,9,1,2,2,3,6] }, + { name = "inodes_used", count = 8640, source = [7,4,5,9,1,2,3,6,2,1] }, + { name = "total", count = 8640, source = [2,3,6,5,3,5,6,7,9,1] }, + { name = "used", count = 8640, source = [1,5,6,7,9,2,3,6,3,5] }, + { name = "used_percent", count = 8640, source = [1,2,3,6,5,6,7,9,5,3] }, +] + +[[measurements]] +name = "diskio" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "serial", source = "parallel" }, + +] +fields = [ + { name = "io_time", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "read_bytes", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "read_time", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "reads", count = 8640, source = 494785 }, + { name = "write_bytes", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "write_time", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "writes", count = 8640, source = [10, 20, 15, 19] }, +] + +[[measurements]] +name = "kernel" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + + # specific tags +] +fields = [ + { name = "boot_time", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "context_switches", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "disk_pages_in", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "disk_pages_out", count = 8640, source = 494785 }, + { name = "interrupts", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "processes_forked", count = 8640, source = [100, 1000, 1102, 3049] }, +] + +[[measurements]] +name = "net" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "interface", source = "eth1" } +] + +fields = [ + { name = "evicted_keys", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "expired_keys", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "instantaneous_input_kbps", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "instantaneous_ops_per_sec", count = 8640, source = 494785 }, + { name = "instantaneous_output_kbps", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "keyspace_hits", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "keyspace_misses", count = 8640, source = [100, 1000, 1102, 3049] }, +] + +# example splitting mem into different sets, for example differentiating by os and datacenter +# +# NOTE: fields types are verified to be the same type across all definitions of a single measurement +[[measurements]] +name = "mem" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags +] +fields = [ + { name = "available", count = 8640, source = 3325 }, + { name = "available_percent", count = 8640, source = 1985 }, + { name = "buffered", count = 8640, source = 9575621 }, + { name = "buffered_percent", count = 8640, source = 489934 }, + { name = "cached", count = 8640, source = 100000 }, + { name = "free", count = 8640, source = [123, 10, 102, 349] }, + { name = "total", count = 8640, source = [12, 22, 95, 229] }, + { name = "used", count = 8640, source = [15, 29, 55, 339] }, + { name = "used_percent", count = 8640, source = [1, 21, 35, 189] }, +] + + +[[measurements]] +name = "nginx" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "port", source = "8888" }, + { name = "server", source = "server-02" } +] +fields = [ + { name = "accepts", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "active", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "handled", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "reading", count = 8640, source = 494785 }, + { name = "requests", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "waiting", count = 8640, source = [100, 1000, 1102, 3049] }, +] + +[[measurements]] +name = "postgresl" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" } + + # specific tags +] +fields = [ + { name = "blk_read_time", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "blk_write_time", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "blks_hit", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "blks_read", count = 8640, source = 494785 }, + { name = "conflicts", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "deadlocks", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "numbackends", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "temp_bytes", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "temp_files", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "tup_deleted", count = 8640, source = 494785 }, + { name = "tup_fetched", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "tup_inserted", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "tup_returned", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "tup_updated", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "xact_commit", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "xact_rollback", count = 8640, source = 494785 }, +] + +[[measurements]] +name = "redis" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "port", source = "6379" }, + { name = "server", source = "redis_02" }, +] +fields = [ + { name = "connected_clients", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "connected_slaves", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "evicted_keys", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "expired_keys", count = 8640, source = 494785 }, + { name = "instantaneous_input_kbps", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "instantaneous_ops_per_sec", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "instantaneous_output_kbps", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "keyspace_hits", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "keyspace_misses", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "latest_fork_usec", count = 8640, source = 494785 }, + { name = "master_repl_offset", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "mem_fragmentation_ratio", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "pubsub_channels", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "pubsub_patterns", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "rdb_changes_since_last_save", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "repl_backlog_active", count = 8640, source = 494785 }, + { name = "repl_backlog_histlen", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "repl_backlog_size", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "sync_full", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "sync_partial_err", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "sync_partial_ok", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "total_connections_received", count = 8640, source = 494785 }, + { name = "uptime_in_seconds", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "used_cpu_sys", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "used_cpu_sys_children", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "used_cpu_user", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "used_cpu_user_children", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "used_memory", count = 8640, source = 494785 }, + { name = "used_memory_lua", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "used_memory_peak", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "used_memory_rss", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, +] + +[[measurements]] +name = "status" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + + # specific tags +] +fields = [ + { name = "service_up", count = 8640, source = [0,1,1,1,1,1,1,1,1,1,0,0,0,1,1,1] }, +] + +[[measurements]] +name = "system" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" } + + # specific tags +] +fields = [ + { name = "load1", count = 8640, source = { type = "zipf", s = 5, v = 4, imax = 4, seed = 22 } }, + { name = "load15", count = 8640, source = { type = "zipf", s = 3, v = 4, imax = 4, seed = 23 } }, + { name = "load5", count = 8640, source = { type = "zipf", s = 7, v = 4, imax = 4, seed = 24 } }, + { name = "n_cpus", count = 8640, source = 8 }, +] From 37296939045847c6beaeb13046b7e22f23bb52f5 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 Mar 2019 17:45:41 +0100 Subject: [PATCH 04/29] do not fail on config error --- cmd/bulk_data_gen/main.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/bulk_data_gen/main.go b/cmd/bulk_data_gen/main.go index c13a1698..becd9cdd 100644 --- a/cmd/bulk_data_gen/main.go +++ b/cmd/bulk_data_gen/main.go @@ -140,10 +140,11 @@ func main() { if configFile != "" { c, err := common.NewConfig(configFile) if err != nil { - log.Fatalf("external config error: %v", err) + log.Printf("Warning: external config error: %v", err) + } else { + common.Config = c + log.Printf("Using config file %s\n", configFile) } - common.Config = c - log.Printf("Using config file %s\n", configFile) } out := bufio.NewWriterSize(os.Stdout, 4<<20) From ee16f47e57aa10af4282ef340f2d3143f4e50824 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 18 Mar 2019 17:46:50 +0100 Subject: [PATCH 05/29] reverting previous commit --- cmd/bulk_data_gen/main.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/bulk_data_gen/main.go b/cmd/bulk_data_gen/main.go index becd9cdd..c13a1698 100644 --- a/cmd/bulk_data_gen/main.go +++ b/cmd/bulk_data_gen/main.go @@ -140,11 +140,10 @@ func main() { if configFile != "" { c, err := common.NewConfig(configFile) if err != nil { - log.Printf("Warning: external config error: %v", err) - } else { - common.Config = c - log.Printf("Using config file %s\n", configFile) + log.Fatalf("external config error: %v", err) } + common.Config = c + log.Printf("Using config file %s\n", configFile) } out := bufio.NewWriterSize(os.Stdout, 4<<20) From 0c465115257e5eb16d37f9594270faf4cd240da1 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 21 Mar 2019 09:45:08 +0100 Subject: [PATCH 06/29] wait until rt is only 2*threshold --- cmd/query_benchmarker_influxdb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 810a1442..690ef7a2 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -328,7 +328,7 @@ loop: } } case <-responseTicker.C: - if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*3 < int64(movingAverageStat.Avg()*1e6) { + if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*2 < int64(movingAverageStat.Avg()*1e6) { responseTimeLimitReached = true scanClose <- 1 respLimitms := float64(responseTimeLimit.Nanoseconds()) / 1e6 From e81a9d9a844cff7ee7a28f2d5a2c89f870807885 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 22 Mar 2019 13:02:56 +0100 Subject: [PATCH 07/29] reverting previous --- cmd/query_benchmarker_influxdb/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 690ef7a2..810a1442 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -328,7 +328,7 @@ loop: } } case <-responseTicker.C: - if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*2 < int64(movingAverageStat.Avg()*1e6) { + if !responseTimeLimitReached && responseTimeLimit > 0 && responseTimeLimit.Nanoseconds()*3 < int64(movingAverageStat.Avg()*1e6) { responseTimeLimitReached = true scanClose <- 1 respLimitms := float64(responseTimeLimit.Nanoseconds()) / 1e6 From c168800c8c03d76e48ba7933f3dbab9511a0e57f Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 29 Mar 2019 12:06:29 +0100 Subject: [PATCH 08/29] Adding debug info --- cmd/bulk_load_influx/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index f956b902..a6ad8bcf 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -337,8 +337,9 @@ func main() { go func(w int) { err := processBatches(NewHTTPWriter(cfg, consistency), backingOffChans[w], telemetryChanPoints, fmt.Sprintf("%d", w)) if err != nil { - fmt.Println(err.Error()) + log.Printf("Worker %d: error: %s\n", w, err.Error()) once.Do(func() { + log.Printf("Worker %d: preparing exit\n", w) endedPrematurely = true prematureEndReason = "Worker error" if !scanFinished { @@ -347,6 +348,7 @@ func main() { //read out remaining batches } }() + log.Printf("Worker %d: Finishing scan\n", w) syncChanDone <- 1 } exitCode = 1 @@ -564,6 +566,7 @@ outer: } } scanFinished = true + log.Println("Scan finished") return itemsRead, bytesRead, totalValues } From 989c40d734fea63467e234e81853e2072d9fa346 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 29 Mar 2019 17:30:56 +0100 Subject: [PATCH 09/29] Adding time out for backoff --- cmd/bulk_load_influx/main.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index a6ad8bcf..3d7c284d 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -54,6 +54,7 @@ var ( batchSize int ingestRateLimit int backoff time.Duration + backoffTimeOut time.Duration timeLimit time.Duration progressInterval time.Duration doLoad bool @@ -130,6 +131,7 @@ func init() { flag.IntVar(&ingestRateLimit, "ingest-rate-limit", -1, "Ingest rate limit in values/s (-1 = no limit).") flag.Int64Var(&itemLimit, "item-limit", -1, "Number of items to read from stdin before quitting. (1 item per 1 line of input.)") flag.DurationVar(&backoff, "backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.") + flag.DurationVar(&backoffTimeOut, "backoff-timeout", time.Minute*30, "Maximum time to spent when dealing with backoff messages in one shot") flag.DurationVar(&timeLimit, "time-limit", -1, "Maximum duration to run (-1 is the default: no limit).") flag.DurationVar(&progressInterval, "progress-interval", -1, "Duration between printing progress messages.") flag.BoolVar(&useGzip, "gzip", true, "Whether to gzip encode requests (default true).") @@ -211,6 +213,10 @@ func init() { if trendSamples <= 0 { trendSamples = int(movingAverageInterval.Seconds()) } + + if timeLimit > 0 && backoffTimeOut > timeLimit { + backoffTimeOut = timeLimit + } } func notifyHandler(arg int) (int, error) { @@ -382,11 +388,12 @@ func main() { close(batchChan) close(syncChanDone) + log.Println("Waiting for workers") workersGroup.Wait() close(statChan) statGroup.Wait() - + log.Println("Closing backoff handlers") for i := range backingOffChans { close(backingOffChans[i]) <-backingOffDones[i] @@ -594,6 +601,7 @@ func processBatches(w *HTTPWriter, backoffSrc chan bool, telemetrySink chan *rep if doLoad { var err error sleepTime := backoff + timeStart := time.Now() for { if useGzip { compressedBatch := bufPool.Get().(*bytes.Buffer) @@ -628,6 +636,11 @@ func processBatches(w *HTTPWriter, backoffSrc chan bool, telemetrySink chan *rep log.Printf("[worker %s] sleeping on backoff response way too long (10x %v)", telemetryWorkerLabel, backoff) sleepTime = 10 * backoff } + checkTime := time.Now() + if timeStart.Add(backoffTimeOut).Before(checkTime) { + log.Printf("[worker %s] Spent too much time in backoff: %ds\n", telemetryWorkerLabel, int64(checkTime.Sub(timeStart).Seconds())) + break + } } else { backoffSrc <- false break From 46b6e538ebc764d994c00ec5f3285e2bbf89da2b Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 2 Apr 2019 10:27:17 +0200 Subject: [PATCH 10/29] measurement-specific flags use config where set --- bulk_data_gen/devops/devops_disk.go | 4 ++++ bulk_data_gen/devops/devops_diskio.go | 3 +++ bulk_data_gen/devops/devops_net.go | 3 +++ bulk_data_gen/devops/devops_nginx.go | 4 ++++ bulk_data_gen/devops/devops_redis.go | 4 ++++ 5 files changed, 18 insertions(+) diff --git a/bulk_data_gen/devops/devops_disk.go b/bulk_data_gen/devops/devops_disk.go index 2d7475c2..d918fa3c 100644 --- a/bulk_data_gen/devops/devops_disk.go +++ b/bulk_data_gen/devops/devops_disk.go @@ -44,6 +44,10 @@ func NewDiskMeasurement(start time.Time, sda int) *DiskMeasurement { } path := []byte(fmt.Sprintf("/dev/sda%d", sda)) fsType := DiskFSTypeChoices[rand.Intn(len(DiskFSTypeChoices))] + if Config != nil { // partial override from external config + path = Config.GetTagBytesValue(DiskByteString, DiskTags[0], true) + fsType = Config.GetTagBytesValue(DiskByteString, DiskTags[1], true) + } return &DiskMeasurement{ path: path, fsType: fsType, diff --git a/bulk_data_gen/devops/devops_diskio.go b/bulk_data_gen/devops/devops_diskio.go index 33606e7f..2c9a4dde 100644 --- a/bulk_data_gen/devops/devops_diskio.go +++ b/bulk_data_gen/devops/devops_diskio.go @@ -36,6 +36,9 @@ func NewDiskIOMeasurement(start time.Time) *DiskIOMeasurement { } serial := []byte(fmt.Sprintf("%03d-%03d-%03d", rand.Intn(1000), rand.Intn(1000), rand.Intn(1000))) + if Config != nil { // partial override from external config + serial = Config.GetTagBytesValue(DiskIOByteString, SerialByteString, true) + } return &DiskIOMeasurement{ serial: serial, diff --git a/bulk_data_gen/devops/devops_net.go b/bulk_data_gen/devops/devops_net.go index 1c237676..162ec8cd 100644 --- a/bulk_data_gen/devops/devops_net.go +++ b/bulk_data_gen/devops/devops_net.go @@ -40,6 +40,9 @@ func NewNetMeasurement(start time.Time) *NetMeasurement { } interfaceName := []byte(fmt.Sprintf("eth%d", rand.Intn(4))) + if Config != nil { // partial override from external config + interfaceName = Config.GetTagBytesValue(NetByteString, NetTags[0], true) + } return &NetMeasurement{ interfaceName: interfaceName, diff --git a/bulk_data_gen/devops/devops_nginx.go b/bulk_data_gen/devops/devops_nginx.go index 2dcd9eed..a48ad863 100644 --- a/bulk_data_gen/devops/devops_nginx.go +++ b/bulk_data_gen/devops/devops_nginx.go @@ -41,6 +41,10 @@ func NewNginxMeasurement(start time.Time) *NginxMeasurement { serverName := []byte(fmt.Sprintf("nginx_%d", rand.Intn(100000))) port := []byte(fmt.Sprintf("%d", rand.Intn(20000)+1024)) + if Config != nil { // partial override from external config + serverName = Config.GetTagBytesValue(NginxByteString, NginxTags[1], true) + port = Config.GetTagBytesValue(NginxByteString, NginxTags[0], true) + } return &NginxMeasurement{ port: port, serverName: serverName, diff --git a/bulk_data_gen/devops/devops_redis.go b/bulk_data_gen/devops/devops_redis.go index 00e5f558..1e06f37f 100644 --- a/bulk_data_gen/devops/devops_redis.go +++ b/bulk_data_gen/devops/devops_redis.go @@ -76,6 +76,10 @@ func NewRedisMeasurement(start time.Time) *RedisMeasurement { serverName := []byte(fmt.Sprintf("redis_%d", rand.Intn(100000))) port := []byte(fmt.Sprintf("%d", rand.Intn(20000)+1024)) + if Config != nil { // partial override from external config + serverName = Config.GetTagBytesValue(RedisByteString, RedisTags[1], true) + port = Config.GetTagBytesValue(RedisByteString, RedisTags[0], true) + } return &RedisMeasurement{ port: port, serverName: serverName, From 6b9caa1e655f22723f81bdd16034ae4a6a8285ba Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 4 Apr 2019 17:06:48 +0200 Subject: [PATCH 11/29] added support for using default (already set) values for tags --- bulk_data_gen/common/config.go | 34 +++++++++++++++++---------- bulk_data_gen/dashboard/host.go | 16 ++++++------- bulk_data_gen/devops/devops_disk.go | 4 ++-- bulk_data_gen/devops/devops_diskio.go | 2 +- bulk_data_gen/devops/devops_net.go | 2 +- bulk_data_gen/devops/devops_nginx.go | 4 ++-- bulk_data_gen/devops/devops_redis.go | 4 ++-- 7 files changed, 37 insertions(+), 29 deletions(-) diff --git a/bulk_data_gen/common/config.go b/bulk_data_gen/common/config.go index acef838c..f9c83902 100644 --- a/bulk_data_gen/common/config.go +++ b/bulk_data_gen/common/config.go @@ -12,14 +12,22 @@ import ( type Source interface{} -func GetSourceValue(s *Source, measurementName, itemKey string) interface{} { - switch reflect.ValueOf(s).Kind() { +var DefaultValueGenerator = map[string]interface{}{ + "type": "default", +} + +func getSourceValue(s *Source, measurementName, itemKey string, itemDefaultValue interface{}) interface{} { + switch reflect.Indirect(reflect.ValueOf(s)).Elem().Kind() { case reflect.Array: array := (*s).([]interface{}) return array[rand.Int63n(int64(len(array)))] case reflect.Map: + m := (*s).(map[string]interface{}) + if reflect.DeepEqual(m, DefaultValueGenerator) { + return itemDefaultValue + } log.Fatalf("generators are not supported (yet) ['%s/%s']", measurementName, itemKey) - default: + default: // primitive types return *s } panic("unreachable") @@ -67,16 +75,16 @@ func (c *ExternalConfig) String() string { return buf.String() } -func (c *ExternalConfig) GetTagBytesValue(measurementName, tagKey []byte, failIfNotFound bool) []byte { - return []byte(c.GetTagValue(string(measurementName), string(tagKey), failIfNotFound)) +func (c *ExternalConfig) GetTagBytesValue(measurementName, tagKey []byte, failIfNotFound bool, defaultValue []byte) []byte { + return []byte(c.GetTagValue(string(measurementName), string(tagKey), failIfNotFound, string(defaultValue))) } -func (c *ExternalConfig) GetTagValue(measurementName, tagKey string, failIfNotFound bool) string { +func (c *ExternalConfig) GetTagValue(measurementName, tagKey string, failIfNotFound bool, defaultValue string) string { for _,m := range c.measurements { if "" == measurementName || m.Name == measurementName { for _,tag := range m.Tags { if tag.Name == tagKey { - return fmt.Sprintf("%v", GetSourceValue(&tag.Source, m.Name, tag.Name)) + return fmt.Sprintf("%v", getSourceValue(&tag.Source, m.Name, tag.Name, defaultValue)) } } } @@ -84,19 +92,19 @@ func (c *ExternalConfig) GetTagValue(measurementName, tagKey string, failIfNotFo if failIfNotFound { log.Fatalf("value for tag '%s/%s' not found", measurementName, tagKey) } - return "" + return "" // defaultValue ? } -func (c *ExternalConfig) GetFieldBytesValue(measurementName, tagKey []byte, failIfNotFound bool) interface{} { - return c.GetFieldValue(string(measurementName), string(tagKey), failIfNotFound) +func (c *ExternalConfig) GetFieldBytesValue(measurementName, tagKey []byte, failIfNotFound bool, defaultValue interface{}) interface{} { + return c.GetFieldValue(string(measurementName), string(tagKey), failIfNotFound, defaultValue) } -func (c *ExternalConfig) GetFieldValue(measurementName, fieldKey string, failIfNotFound bool) interface{} { +func (c *ExternalConfig) GetFieldValue(measurementName, fieldKey string, failIfNotFound bool, defaultValue interface{}) interface{} { for _,m := range c.measurements { if "" == measurementName || m.Name == measurementName { for _,field := range m.Fields { if field.Name == fieldKey { - return GetSourceValue(&field.Source, m.Name, field.Name) + return getSourceValue(&field.Source, m.Name, field.Name, defaultValue) } } } @@ -104,7 +112,7 @@ func (c *ExternalConfig) GetFieldValue(measurementName, fieldKey string, failIfN if failIfNotFound { log.Fatalf("value for field '%s/%s' not found", measurementName, fieldKey) } - return nil + return nil // defaultValue ? } func NewConfig(path string) (*ExternalConfig, error) { diff --git a/bulk_data_gen/dashboard/host.go b/bulk_data_gen/dashboard/host.go index 5b94f733..dddb6b38 100644 --- a/bulk_data_gen/dashboard/host.go +++ b/bulk_data_gen/dashboard/host.go @@ -95,14 +95,14 @@ func NewHost(i int, offset int, start time.Time) Host { // partial override from external config if Config != nil { - h.Region = Config.GetTagBytesValue(nil, devops.MachineTagKeys[1], true) - h.Datacenter = Config.GetTagBytesValue(nil, devops.MachineTagKeys[2], true) - h.Rack = Config.GetTagBytesValue(nil, devops.MachineTagKeys[3], true) - h.OS = Config.GetTagBytesValue(nil, devops.MachineTagKeys[4], true) - h.Arch = Config.GetTagBytesValue(nil, devops.MachineTagKeys[5], true) - h.Service = Config.GetTagBytesValue(nil, devops.MachineTagKeys[7], true) - h.ServiceVersion = Config.GetTagBytesValue(nil, devops.MachineTagKeys[8], true) - h.ServiceEnvironment = Config.GetTagBytesValue(nil, devops.MachineTagKeys[9], true) + h.Region = Config.GetTagBytesValue(nil, devops.MachineTagKeys[1], true, []byte(h.Region)) + h.Datacenter = Config.GetTagBytesValue(nil, devops.MachineTagKeys[2], true, []byte(h.Datacenter)) + h.Rack = Config.GetTagBytesValue(nil, devops.MachineTagKeys[3], true, []byte(h.Rack)) + h.OS = Config.GetTagBytesValue(nil, devops.MachineTagKeys[4], true, []byte(h.OS)) + h.Arch = Config.GetTagBytesValue(nil, devops.MachineTagKeys[5], true, []byte(h.Arch)) + h.Service = Config.GetTagBytesValue(nil, devops.MachineTagKeys[7], true, []byte(h.Service)) + h.ServiceVersion = Config.GetTagBytesValue(nil, devops.MachineTagKeys[8], true, []byte(h.ServiceVersion)) + h.ServiceEnvironment = Config.GetTagBytesValue(nil, devops.MachineTagKeys[9], true, []byte(h.ServiceEnvironment)) } currentHostIndex++ diff --git a/bulk_data_gen/devops/devops_disk.go b/bulk_data_gen/devops/devops_disk.go index d918fa3c..cb528f87 100644 --- a/bulk_data_gen/devops/devops_disk.go +++ b/bulk_data_gen/devops/devops_disk.go @@ -45,8 +45,8 @@ func NewDiskMeasurement(start time.Time, sda int) *DiskMeasurement { path := []byte(fmt.Sprintf("/dev/sda%d", sda)) fsType := DiskFSTypeChoices[rand.Intn(len(DiskFSTypeChoices))] if Config != nil { // partial override from external config - path = Config.GetTagBytesValue(DiskByteString, DiskTags[0], true) - fsType = Config.GetTagBytesValue(DiskByteString, DiskTags[1], true) + path = Config.GetTagBytesValue(DiskByteString, DiskTags[0], true, path) + fsType = Config.GetTagBytesValue(DiskByteString, DiskTags[1], true, fsType) } return &DiskMeasurement{ path: path, diff --git a/bulk_data_gen/devops/devops_diskio.go b/bulk_data_gen/devops/devops_diskio.go index 2c9a4dde..ba56517f 100644 --- a/bulk_data_gen/devops/devops_diskio.go +++ b/bulk_data_gen/devops/devops_diskio.go @@ -37,7 +37,7 @@ func NewDiskIOMeasurement(start time.Time) *DiskIOMeasurement { serial := []byte(fmt.Sprintf("%03d-%03d-%03d", rand.Intn(1000), rand.Intn(1000), rand.Intn(1000))) if Config != nil { // partial override from external config - serial = Config.GetTagBytesValue(DiskIOByteString, SerialByteString, true) + serial = Config.GetTagBytesValue(DiskIOByteString, SerialByteString, true, serial) } return &DiskIOMeasurement{ serial: serial, diff --git a/bulk_data_gen/devops/devops_net.go b/bulk_data_gen/devops/devops_net.go index 162ec8cd..4ebdb77c 100644 --- a/bulk_data_gen/devops/devops_net.go +++ b/bulk_data_gen/devops/devops_net.go @@ -41,7 +41,7 @@ func NewNetMeasurement(start time.Time) *NetMeasurement { interfaceName := []byte(fmt.Sprintf("eth%d", rand.Intn(4))) if Config != nil { // partial override from external config - interfaceName = Config.GetTagBytesValue(NetByteString, NetTags[0], true) + interfaceName = Config.GetTagBytesValue(NetByteString, NetTags[0], true, interfaceName) } return &NetMeasurement{ interfaceName: interfaceName, diff --git a/bulk_data_gen/devops/devops_nginx.go b/bulk_data_gen/devops/devops_nginx.go index a48ad863..b7cba429 100644 --- a/bulk_data_gen/devops/devops_nginx.go +++ b/bulk_data_gen/devops/devops_nginx.go @@ -42,8 +42,8 @@ func NewNginxMeasurement(start time.Time) *NginxMeasurement { serverName := []byte(fmt.Sprintf("nginx_%d", rand.Intn(100000))) port := []byte(fmt.Sprintf("%d", rand.Intn(20000)+1024)) if Config != nil { // partial override from external config - serverName = Config.GetTagBytesValue(NginxByteString, NginxTags[1], true) - port = Config.GetTagBytesValue(NginxByteString, NginxTags[0], true) + serverName = Config.GetTagBytesValue(NginxByteString, NginxTags[1], true, serverName) + port = Config.GetTagBytesValue(NginxByteString, NginxTags[0], true, port) } return &NginxMeasurement{ port: port, diff --git a/bulk_data_gen/devops/devops_redis.go b/bulk_data_gen/devops/devops_redis.go index 1e06f37f..85fff777 100644 --- a/bulk_data_gen/devops/devops_redis.go +++ b/bulk_data_gen/devops/devops_redis.go @@ -77,8 +77,8 @@ func NewRedisMeasurement(start time.Time) *RedisMeasurement { serverName := []byte(fmt.Sprintf("redis_%d", rand.Intn(100000))) port := []byte(fmt.Sprintf("%d", rand.Intn(20000)+1024)) if Config != nil { // partial override from external config - serverName = Config.GetTagBytesValue(RedisByteString, RedisTags[1], true) - port = Config.GetTagBytesValue(RedisByteString, RedisTags[0], true) + serverName = Config.GetTagBytesValue(RedisByteString, RedisTags[1], true, serverName) + port = Config.GetTagBytesValue(RedisByteString, RedisTags[0], true, port) } return &RedisMeasurement{ port: port, From 87bb24a2b5837181e6ffc3b60c08cef847ea7457 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 4 Apr 2019 17:07:22 +0200 Subject: [PATCH 12/29] config for performance-problematic setup --- bonitoo-mix.toml | 352 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 bonitoo-mix.toml diff --git a/bonitoo-mix.toml b/bonitoo-mix.toml new file mode 100644 index 00000000..740fd20a --- /dev/null +++ b/bonitoo-mix.toml @@ -0,0 +1,352 @@ +title = "Bonitoo schema" + +[[measurements]] +name = "cpu" +# sample 40% of the tag set, default is to sample 50% +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" } + # specific tags +] +fields = [ + { name = "usage_guest", count = 8640, source = { type = "rand", seed = 10 } }, + { name = "usage_guest_nice", count = 8640, source = { type = "rand", seed = 11 } }, + { name = "usage_idle", count = 8640, source = { type = "rand", seed = 12 } }, + { name = "usage_iowait", count = 8640, source = { type = "rand", seed = 13 } }, + { name = "usage_irq", count = 8640, source = { type = "rand", seed = 14 } }, + { name = "usage_nice", count = 8640, source = { type = "rand", seed = 15 } }, + { name = "usage_softirq", count = 8640, source = { type = "rand", seed = 16 } }, + { name = "usage_steal", count = 8640, source = { type = "rand", seed = 17 } }, + { name = "usage_system", count = 8640, source = { type = "rand", seed = 18 } }, + { name = "usage_user", count = 8640, source = { type = "rand", seed = 19 } }, +] + +[[measurements]] +name = "disk" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "fstype", source = { type = "default" } }, + { name = "path", source = { type = "default" } } +] +fields = [ + { name = "free", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "inodes_free", count = 8640, source = [3,5,5,6,7,9,1,2,9,6] }, + { name = "inodes_total", count = 8640, source = [1,8,5,3,9,1,2,2,3,6] }, + { name = "inodes_used", count = 8640, source = [7,4,5,9,1,2,3,6,2,1] }, + { name = "total", count = 8640, source = [2,3,6,5,3,5,6,7,9,1] }, + { name = "used", count = 8640, source = [1,5,6,7,9,2,3,6,3,5] }, + { name = "used_percent", count = 8640, source = [1,2,3,6,5,6,7,9,5,3] }, +] + +[[measurements]] +name = "diskio" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "serial", source = { type = "default" } }, + +] +fields = [ + { name = "io_time", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "read_bytes", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "read_time", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "reads", count = 8640, source = 494785 }, + { name = "write_bytes", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "write_time", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "writes", count = 8640, source = [10, 20, 15, 19] }, +] + +[[measurements]] +name = "kernel" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + + # specific tags +] +fields = [ + { name = "boot_time", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "context_switches", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "disk_pages_in", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "disk_pages_out", count = 8640, source = 494785 }, + { name = "interrupts", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "processes_forked", count = 8640, source = [100, 1000, 1102, 3049] }, +] + +[[measurements]] +name = "net" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "interface", source = { type = "default" } } +] + +fields = [ + { name = "evicted_keys", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "expired_keys", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "instantaneous_input_kbps", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "instantaneous_ops_per_sec", count = 8640, source = 494785 }, + { name = "instantaneous_output_kbps", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "keyspace_hits", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "keyspace_misses", count = 8640, source = [100, 1000, 1102, 3049] }, +] + +# example splitting mem into different sets, for example differentiating by os and datacenter +# +# NOTE: fields types are verified to be the same type across all definitions of a single measurement +[[measurements]] +name = "mem" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags +] +fields = [ + { name = "available", count = 8640, source = 3325 }, + { name = "available_percent", count = 8640, source = 1985 }, + { name = "buffered", count = 8640, source = 9575621 }, + { name = "buffered_percent", count = 8640, source = 489934 }, + { name = "cached", count = 8640, source = 100000 }, + { name = "free", count = 8640, source = [123, 10, 102, 349] }, + { name = "total", count = 8640, source = [12, 22, 95, 229] }, + { name = "used", count = 8640, source = [15, 29, 55, 339] }, + { name = "used_percent", count = 8640, source = [1, 21, 35, 189] }, +] + + +[[measurements]] +name = "nginx" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "port", source = { type = "default" } }, + { name = "server", source = { type = "default" } } +] +fields = [ + { name = "accepts", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "active", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "handled", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "reading", count = 8640, source = 494785 }, + { name = "requests", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "waiting", count = 8640, source = [100, 1000, 1102, 3049] }, +] + +[[measurements]] +name = "postgresl" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" } + + # specific tags +] +fields = [ + { name = "blk_read_time", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "blk_write_time", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "blks_hit", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "blks_read", count = 8640, source = 494785 }, + { name = "conflicts", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "deadlocks", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "numbackends", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "temp_bytes", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "temp_files", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "tup_deleted", count = 8640, source = 494785 }, + { name = "tup_fetched", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "tup_inserted", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "tup_returned", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "tup_updated", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "xact_commit", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "xact_rollback", count = 8640, source = 494785 }, +] + +[[measurements]] +name = "redis" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + # specific tags + { name = "port", source = { type = "default" } }, + { name = "server", source = { type = "default" } }, +] +fields = [ + { name = "connected_clients", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "connected_slaves", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "evicted_keys", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "expired_keys", count = 8640, source = 494785 }, + { name = "instantaneous_input_kbps", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "instantaneous_ops_per_sec", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "instantaneous_output_kbps", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "keyspace_hits", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "keyspace_misses", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "latest_fork_usec", count = 8640, source = 494785 }, + { name = "master_repl_offset", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "mem_fragmentation_ratio", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "pubsub_channels", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "pubsub_patterns", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "rdb_changes_since_last_save", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "repl_backlog_active", count = 8640, source = 494785 }, + { name = "repl_backlog_histlen", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "repl_backlog_size", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "sync_full", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "sync_partial_err", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "sync_partial_ok", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "total_connections_received", count = 8640, source = 494785 }, + { name = "uptime_in_seconds", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "used_cpu_sys", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "used_cpu_sys_children", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, + { name = "used_cpu_user", count = 8640, source = [49854,10,58873,598,111,333,5] }, + { name = "used_cpu_user_children", count = 8640, source = [4957,120,10475,847457,44646,2222] }, + { name = "used_memory", count = 8640, source = 494785 }, + { name = "used_memory_lua", count = 8640, source = [14957,1120,110475,1847457,144646,222] }, + { name = "used_memory_peak", count = 8640, source = [100, 1000, 1102, 3049] }, + { name = "used_memory_rss", count = 8640, source = [5,6,7,9,1,2,3,6,5,3] }, +] + +[[measurements]] +name = "status" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" }, + + + # specific tags +] +fields = [ + { name = "service_up", count = 8640, source = [0,1,1,1,1,1,1,1,1,1,0,0,0,1,1,1] }, +] + +[[measurements]] +name = "system" +sample = 1.0 +tags = [ + # common tags + { name = "arch", source = "x86" }, + { name = "cluster_id", source = { type = "sequence", format = "%s", start = 0, count = 10 } }, + { name = "datacenter", source = "virginia" }, + { name = "hostname", source = ["meta_1","meta_2","meta_3","data_1","data_2","data_3","data_4","data_5","data_6","data_7"] }, + { name = "os", source = "Ubuntu15.10" }, + { name = "rack", source = "1" }, + { name = "region", source = "us-west-01" }, + { name = "service", source = "9" }, + { name = "service_environment", source = "test" }, + { name = "service_version", source = "2" } + + # specific tags +] +fields = [ + { name = "load1", count = 8640, source = { type = "zipf", s = 5, v = 4, imax = 4, seed = 22 } }, + { name = "load15", count = 8640, source = { type = "zipf", s = 3, v = 4, imax = 4, seed = 23 } }, + { name = "load5", count = 8640, source = { type = "zipf", s = 7, v = 4, imax = 4, seed = 24 } }, + { name = "n_cpus", count = 8640, source = 8 }, +] From a90c83e5317138491649e2721642bb6bf838e9b3 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 2 May 2019 16:39:58 +0200 Subject: [PATCH 13/29] added random sleep (0;increaseInterval) on start --- cmd/query_benchmarker_influxdb/main.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 810a1442..b924db9f 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "log" + "math/rand" "net/rpc" "os" "runtime/pprof" @@ -198,6 +199,12 @@ func init() { } func main() { + if increaseInterval > 0 { + rst := time.Duration(rand.Int63n(int64(increaseInterval.Seconds()))) * time.Second + log.Printf("Random sleep for %v", rst) + time.Sleep(rst) + } + // Make pools to minimize heap usage: queryPool = sync.Pool{ New: func() interface{} { From 2eda0e2da80b0bf5776b4569abf2a71929a10930 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Fri, 3 May 2019 09:59:30 +0200 Subject: [PATCH 14/29] fixed random sleep time generator (was pseudorandom) --- cmd/query_benchmarker_influxdb/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index b924db9f..25c94a48 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -199,7 +199,8 @@ func init() { } func main() { - if increaseInterval > 0 { + if gradualWorkersIncrease { + rand.Seed(int64(time.Now().Nanosecond()) << uint(clientIndex)) rst := time.Duration(rand.Int63n(int64(increaseInterval.Seconds()))) * time.Second log.Printf("Random sleep for %v", rst) time.Sleep(rst) From 263dd7428db1f46c3a23f963f96cf3c766bcfded Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 6 May 2019 16:01:10 +0200 Subject: [PATCH 15/29] added short random sleep between queries in batch --- cmd/query_benchmarker_influxdb/main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/query_benchmarker_influxdb/main.go b/cmd/query_benchmarker_influxdb/main.go index 25c94a48..afd796c7 100644 --- a/cmd/query_benchmarker_influxdb/main.go +++ b/cmd/query_benchmarker_influxdb/main.go @@ -201,10 +201,11 @@ func init() { func main() { if gradualWorkersIncrease { rand.Seed(int64(time.Now().Nanosecond()) << uint(clientIndex)) - rst := time.Duration(rand.Int63n(int64(increaseInterval.Seconds()))) * time.Second + // random sleep is done in Ansible task // TODO remove +/* rst := time.Duration(rand.Int63n(int64(increaseInterval.Seconds()))) * time.Second log.Printf("Random sleep for %v", rst) time.Sleep(rst) - } +*/ } // Make pools to minimize heap usage: queryPool = sync.Pool{ @@ -581,6 +582,9 @@ func processQueries(w HTTPClient) error { for _, q := range queries { go processSingleQuery(w, q, opts, errCh, doneCh) queriesSeen++ + if gradualWorkersIncrease { + time.Sleep(time.Duration(rand.Int63n(150)) * time.Millisecond) // random sleep 0-150ms + } } loop: From 96889ef4fafc32618f1b32a790ffb4fe1e0e5ab2 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 9 May 2019 17:21:16 +0200 Subject: [PATCH 16/29] log avg updates --- cmd/bulk_load_influx/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index 3d7c284d..bf1f5c70 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -820,7 +820,8 @@ func processStats(telemetrySink chan *report.Point) { i++ - if now.Sub(lastRefresh).Seconds() >= 1 { + dt := now.Sub(lastRefresh).Seconds() + if dt >= 1 { movingAverageStat.UpdateAvg(now, workers) lastRefresh = now // Report telemetry, if applicable: @@ -836,6 +837,7 @@ func processStats(telemetrySink chan *report.Point) { p.AddIntField("load_workers", workers) telemetrySink <- p } + log.Printf("mean updated after %f", dt) } // print stats to stderr (if printInterval is greater than zero): From e7f35b946cde246f1eab9bbab169aaae63fe31c5 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 14 May 2019 10:05:11 +0200 Subject: [PATCH 17/29] removed debug output --- cmd/bulk_load_influx/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index bf1f5c70..86da7133 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -837,7 +837,6 @@ func processStats(telemetrySink chan *report.Point) { p.AddIntField("load_workers", workers) telemetrySink <- p } - log.Printf("mean updated after %f", dt) } // print stats to stderr (if printInterval is greater than zero): From b40305cf34a800c9b087020854da27ac8ad1f042 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 19 Jun 2019 14:05:38 +0200 Subject: [PATCH 18/29] added response status code check --- cmd/bulk_load_influx/main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index 86da7133..d2dd601b 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -747,7 +747,7 @@ func createDb(daemonUrl, dbname string, replicationFactor int) error { defer resp.Body.Close() // does the body need to be read into the void? - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { return fmt.Errorf("bad db create") } return nil @@ -763,6 +763,10 @@ func listDatabases(daemonUrl string) ([]string, error) { defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("listDatabases returned status code: %v", resp.StatusCode) + } + body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err @@ -783,7 +787,7 @@ func listDatabases(daemonUrl string) ([]string, error) { return nil, err } - ret := []string{} + var ret []string for _, nestedName := range listing.Results[0].Series[0].Values { name := nestedName[0] // the _internal database is skipped: From dbd1ac643e4ecbbc7c2d5983e1a3125230374012 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 19 Jun 2019 14:12:19 +0200 Subject: [PATCH 19/29] more descriptive error msg --- cmd/bulk_load_influx/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index d2dd601b..7ec60398 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -748,7 +748,7 @@ func createDb(daemonUrl, dbname string, replicationFactor int) error { // does the body need to be read into the void? if resp.StatusCode != http.StatusOK { - return fmt.Errorf("bad db create") + return fmt.Errorf("createDb returned status code: %v", resp.StatusCode) } return nil } From 994e9ddcd82f82af8f0d3db5c89834b9cf268281 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 20 Jun 2019 16:57:53 +0200 Subject: [PATCH 20/29] added new metric 'values_written' --- cmd/bulk_load_influx/main.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index 7ec60398..18d86da6 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -824,10 +824,23 @@ func processStats(telemetrySink chan *report.Point) { i++ + // Report telemetry, if applicable: + if telemetrySink != nil { + p := report.GetPointFromGlobalPool() + p.Init("benchmarks_telemetry", now.UnixNano()) + for _, tagpair := range reportTags { + p.AddTag(tagpair[0], tagpair[1]) + } + p.AddTag("client_type", "load") + p.AddFloat64Field("values_written", stat.Value) + telemetrySink <- p + } + dt := now.Sub(lastRefresh).Seconds() if dt >= 1 { movingAverageStat.UpdateAvg(now, workers) lastRefresh = now + // Report telemetry, if applicable: if telemetrySink != nil { p := report.GetPointFromGlobalPool() From a5bb28e10c3f72c3958371e6d41b76f5d70966f0 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 4 Jul 2019 12:55:00 +0200 Subject: [PATCH 21/29] interleave hosts first --- bulk_data_gen/dashboard/generate_data.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/bulk_data_gen/dashboard/generate_data.go b/bulk_data_gen/dashboard/generate_data.go index 5d6c0a88..eca8523f 100644 --- a/bulk_data_gen/dashboard/generate_data.go +++ b/bulk_data_gen/dashboard/generate_data.go @@ -76,15 +76,14 @@ func (d *DashboardSimulatorConfig) ToSimulator() *DashboardSimulator { // Next advances a Point to the next state in the generator. func (d *DashboardSimulator) Next(p *Point) { - // switch to the next metric if needed - if d.hostIndex == len(d.hosts) { - d.hostIndex = 0 - d.simulatedMeasurementIndex++ - } - + // switch to the next host if needed if d.simulatedMeasurementIndex == NHostSims { d.simulatedMeasurementIndex = 0 + d.hostIndex++ + } + if d.hostIndex == len(d.hosts) { + d.hostIndex = 0 for i := 0; i < len(d.hosts); i++ { d.hosts[i].TickAll(devops.EpochDuration) } @@ -108,7 +107,7 @@ func (d *DashboardSimulator) Next(p *Point) { host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p) d.madePoints++ - d.hostIndex++ + d.simulatedMeasurementIndex++ d.madeValues += int64(len(p.FieldValues)) return From 0ed021b065eb9bcc004d96b49e2240bd367002fe Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 10:40:18 +0200 Subject: [PATCH 22/29] exact values counting --- cmd/bulk_load_influx/main.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index 18d86da6..7272f558 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -36,8 +36,8 @@ import ( "strconv" ) -// TODO VH: This should be calculated from available simulation data -const ValuesPerMeasurement = 9.63636 // dashboard use-case, original value was: 11.2222 +// Approx number of values per measurement, used for initial batch size guess +const ApproxValuesPerMeasurement = 9.63636 // dashboard use-case, original value (for devops) was: 11.2222 // TODO AP: Maybe useless const RateControlGranularity = 1000 // 1000 ms = 1s @@ -118,6 +118,7 @@ type statsMap map[string]*StatGroup type batch struct { Buffer *bytes.Buffer Items int + Values int } // Parse args: @@ -194,7 +195,7 @@ func init() { if ingestRateLimit > 0 { ingestionRateGran = (float64(ingestRateLimit) / float64(workers)) / (float64(1000) / float64(RateControlGranularity)) log.Printf("Using worker ingestion rate %v values/%v ms", ingestionRateGran, RateControlGranularity) - recommendedBatchSize := int((ingestionRateGran / ValuesPerMeasurement) * 0.20) + recommendedBatchSize := int((ingestionRateGran / ApproxValuesPerMeasurement) * 0.20) log.Printf("Calculated batch size hint: %v (allowed min: %v max: %v)", recommendedBatchSize, RateControlMinBatchSize, batchSize) if recommendedBatchSize < RateControlMinBatchSize { recommendedBatchSize = RateControlMinBatchSize @@ -203,7 +204,7 @@ func init() { } maxBatchSize = batchSize if recommendedBatchSize != batchSize { - log.Printf("Adjusting batchSize from %v to %v (%v values in 1 batch)", batchSize, recommendedBatchSize, float32(recommendedBatchSize)*ValuesPerMeasurement) + log.Printf("Adjusting batchSize from %v to %v (%v values in 1 batch)", batchSize, recommendedBatchSize, float32(recommendedBatchSize) * ApproxValuesPerMeasurement) batchSize = recommendedBatchSize } } else { @@ -473,7 +474,7 @@ func scan(itemsPerBatch int, doneCh chan int) (int64, int64, int64) { scanFinished = false buf := bufPool.Get().(*bytes.Buffer) - var n int + var n, values int var itemsRead, bytesRead int64 var totalPoints, totalValues int64 @@ -509,6 +510,18 @@ outer: log.Fatal(err) } continue + } else { + lineParts := strings.Split(line," ") + if len(lineParts) != 3 { + log.Fatalf("invalid protocol line: '%s'", line) + } + fieldsParts := strings.Split(lineParts[1], ",") + fieldCnt := len(fieldsParts) + if fieldCnt == 0 { + log.Fatalf("invalid fields parts: '%s'", lineParts[1]) + } + values += fieldCnt + totalValues += int64(fieldCnt) } itemsRead++ batchItemCount++ @@ -522,9 +535,10 @@ outer: batchItemCount = 0 bytesRead += int64(buf.Len()) - batchChan <- batch{buf, n} + batchChan <- batch{buf, n, values} buf = bufPool.Get().(*bytes.Buffer) n = 0 + values = 0 if timeLimit > 0 && time.Now().After(deadline) { endedPrematurely = true @@ -559,7 +573,7 @@ outer: // Finished reading input, make sure last batch goes out. if n > 0 { - batchChan <- batch{buf, n} + batchChan <- batch{buf, n, values} } // Closing inputDone signals to the application that we've read everything and can now shut down. @@ -568,9 +582,9 @@ outer: if itemsRead != totalPoints { // totalPoints is unknown (0) when exiting prematurely due to time limit if !endedPrematurely { log.Fatalf("Incorrent number of read points: %d, expected: %d:", itemsRead, totalPoints) - } else { + } /*else { totalValues = int64(float64(itemsRead) * ValuesPerMeasurement) // needed for statistics summary - } + }*/ } scanFinished = true log.Println("Scan finished") @@ -631,7 +645,7 @@ func processBatches(w *HTTPWriter, backoffSrc chan bool, telemetrySink chan *rep telemetrySink <- p } time.Sleep(sleepTime) - sleepTime += backoff // sleep longer if backpressure comes again + sleepTime += backoff // sleep longer if back pressure comes again if sleepTime > 10*backoff { // but not longer than 10x default backoff time log.Printf("[worker %s] sleeping on backoff response way too long (10x %v)", telemetryWorkerLabel, backoff) sleepTime = 10 * backoff @@ -661,7 +675,7 @@ func processBatches(w *HTTPWriter, backoffSrc chan bool, telemetrySink chan *rep // Normally report after each batch reportStat := true - valuesWritten := float64(batch.Items) * ValuesPerMeasurement + valuesWritten := float64(batch.Values) //float64(batch.Items) * ValuesPerMeasurement // Apply ingest rate control if set if ingestRateLimit > 0 { From bf0b337beb9d73d211947dab2630446e95999c9c Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 11:05:22 +0200 Subject: [PATCH 23/29] exact values counting --- bulk_data_gen/devops/devops_generate_data.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/bulk_data_gen/devops/devops_generate_data.go b/bulk_data_gen/devops/devops_generate_data.go index e104dce4..1e88a097 100644 --- a/bulk_data_gen/devops/devops_generate_data.go +++ b/bulk_data_gen/devops/devops_generate_data.go @@ -75,15 +75,14 @@ func (d *DevopsSimulatorConfig) ToSimulator() *DevopsSimulator { // Next advances a Point to the next state in the generator. func (d *DevopsSimulator) Next(p *Point) { - // switch to the next metric if needed - if d.hostIndex == len(d.hosts) { - d.hostIndex = 0 - d.simulatedMeasurementIndex++ - } - + // switch to the next host if needed if d.simulatedMeasurementIndex == NHostSims { d.simulatedMeasurementIndex = 0 + d.hostIndex++ + } + if d.hostIndex == len(d.hosts) { + d.hostIndex = 0 for i := 0; i < len(d.hosts); i++ { d.hosts[i].TickAll(EpochDuration) } @@ -107,7 +106,7 @@ func (d *DevopsSimulator) Next(p *Point) { host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p) d.madePoints++ - d.hostIndex++ + d.simulatedMeasurementIndex++ d.madeValues += int64(len(p.FieldValues)) return From c64b74a29db09c0d0c804b49b6eb1afbd76cf62c Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 11:06:07 +0200 Subject: [PATCH 24/29] Revert "exact values counting" This reverts commit bf0b337beb9d73d211947dab2630446e95999c9c. --- bulk_data_gen/devops/devops_generate_data.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/bulk_data_gen/devops/devops_generate_data.go b/bulk_data_gen/devops/devops_generate_data.go index 1e88a097..e104dce4 100644 --- a/bulk_data_gen/devops/devops_generate_data.go +++ b/bulk_data_gen/devops/devops_generate_data.go @@ -75,14 +75,15 @@ func (d *DevopsSimulatorConfig) ToSimulator() *DevopsSimulator { // Next advances a Point to the next state in the generator. func (d *DevopsSimulator) Next(p *Point) { - // switch to the next host if needed + // switch to the next metric if needed + if d.hostIndex == len(d.hosts) { + d.hostIndex = 0 + d.simulatedMeasurementIndex++ + } + if d.simulatedMeasurementIndex == NHostSims { d.simulatedMeasurementIndex = 0 - d.hostIndex++ - } - if d.hostIndex == len(d.hosts) { - d.hostIndex = 0 for i := 0; i < len(d.hosts); i++ { d.hosts[i].TickAll(EpochDuration) } @@ -106,7 +107,7 @@ func (d *DevopsSimulator) Next(p *Point) { host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p) d.madePoints++ - d.simulatedMeasurementIndex++ + d.hostIndex++ d.madeValues += int64(len(p.FieldValues)) return From 1a7366701288ab0f97b1f1d0b98171d9a590ca2e Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 11:09:23 +0200 Subject: [PATCH 25/29] changed devops measurements interleaving --- bulk_data_gen/devops/devops_generate_data.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/bulk_data_gen/devops/devops_generate_data.go b/bulk_data_gen/devops/devops_generate_data.go index e104dce4..1e88a097 100644 --- a/bulk_data_gen/devops/devops_generate_data.go +++ b/bulk_data_gen/devops/devops_generate_data.go @@ -75,15 +75,14 @@ func (d *DevopsSimulatorConfig) ToSimulator() *DevopsSimulator { // Next advances a Point to the next state in the generator. func (d *DevopsSimulator) Next(p *Point) { - // switch to the next metric if needed - if d.hostIndex == len(d.hosts) { - d.hostIndex = 0 - d.simulatedMeasurementIndex++ - } - + // switch to the next host if needed if d.simulatedMeasurementIndex == NHostSims { d.simulatedMeasurementIndex = 0 + d.hostIndex++ + } + if d.hostIndex == len(d.hosts) { + d.hostIndex = 0 for i := 0; i < len(d.hosts); i++ { d.hosts[i].TickAll(EpochDuration) } @@ -107,7 +106,7 @@ func (d *DevopsSimulator) Next(p *Point) { host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p) d.madePoints++ - d.hostIndex++ + d.simulatedMeasurementIndex++ d.madeValues += int64(len(p.FieldValues)) return From 205e15ef47e7ac6d30b37cdaf7336c5045e6d105 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 12:33:26 +0200 Subject: [PATCH 26/29] faster field counting --- cmd/bulk_load_influx/main.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index 7272f558..d4421b9a 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -511,12 +511,11 @@ outer: } continue } else { - lineParts := strings.Split(line," ") + lineParts := strings.Split(line," ") // "measurement,tags fields timestamp" if len(lineParts) != 3 { log.Fatalf("invalid protocol line: '%s'", line) } - fieldsParts := strings.Split(lineParts[1], ",") - fieldCnt := len(fieldsParts) + fieldCnt := strings.Count(lineParts[1], "=") if fieldCnt == 0 { log.Fatalf("invalid fields parts: '%s'", lineParts[1]) } From 86902ebfbdd7b029dfe5ac386f7df26fc88ce348 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 12:38:05 +0200 Subject: [PATCH 27/29] increased output buffer size --- cmd/bulk_data_gen/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/bulk_data_gen/main.go b/cmd/bulk_data_gen/main.go index c13a1698..c9fb53b0 100644 --- a/cmd/bulk_data_gen/main.go +++ b/cmd/bulk_data_gen/main.go @@ -146,7 +146,7 @@ func main() { log.Printf("Using config file %s\n", configFile) } - out := bufio.NewWriterSize(os.Stdout, 4<<20) + out := bufio.NewWriterSize(os.Stdout, 16<<20) defer out.Flush() var sim common.Simulator From 03bf644ea421260b6f0e1a23675f08d46b5bcf24 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Wed, 10 Jul 2019 12:38:29 +0200 Subject: [PATCH 28/29] increased input buffer size --- cmd/bulk_load_influx/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index d4421b9a..a7adfb32 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -486,7 +486,7 @@ func scan(itemsPerBatch int, doneCh chan int) (int64, int64, int64) { var batchItemCount uint64 - scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 4*1024*1024)) + scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 16*1024*1024)) outer: for scanner.Scan() { if itemsRead == itemLimit { From a96a43b7c8b6ff310d94fd8fc76a418a5e636840 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Thu, 11 Jul 2019 14:08:20 +0200 Subject: [PATCH 29/29] reverting to original buffer size 4MB --- cmd/bulk_data_gen/main.go | 2 +- cmd/bulk_load_influx/main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/bulk_data_gen/main.go b/cmd/bulk_data_gen/main.go index c9fb53b0..c13a1698 100644 --- a/cmd/bulk_data_gen/main.go +++ b/cmd/bulk_data_gen/main.go @@ -146,7 +146,7 @@ func main() { log.Printf("Using config file %s\n", configFile) } - out := bufio.NewWriterSize(os.Stdout, 16<<20) + out := bufio.NewWriterSize(os.Stdout, 4<<20) defer out.Flush() var sim common.Simulator diff --git a/cmd/bulk_load_influx/main.go b/cmd/bulk_load_influx/main.go index a7adfb32..d4421b9a 100644 --- a/cmd/bulk_load_influx/main.go +++ b/cmd/bulk_load_influx/main.go @@ -486,7 +486,7 @@ func scan(itemsPerBatch int, doneCh chan int) (int64, int64, int64) { var batchItemCount uint64 - scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 16*1024*1024)) + scanner := bufio.NewScanner(bufio.NewReaderSize(os.Stdin, 4*1024*1024)) outer: for scanner.Scan() { if itemsRead == itemLimit {