Skip to content

Commit

Permalink
Merge branch 'feature/datagen2-toml-config' into feature/query_benchm…
Browse files Browse the repository at this point in the history
…arker_teardown_sync
  • Loading branch information
alespour authored Aug 8, 2019
2 parents f1a805a + a96a43b commit 561af60
Show file tree
Hide file tree
Showing 14 changed files with 969 additions and 34 deletions.
352 changes: 352 additions & 0 deletions bonitoo-mix.toml

Large diffs are not rendered by default.

352 changes: 352 additions & 0 deletions bonitoo.toml

Large diffs are not rendered by default.

134 changes: 134 additions & 0 deletions bulk_data_gen/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package common

import (
"bytes"
"encoding/json"
"fmt"
"github.com/pelletier/go-toml"
"log"
"math/rand"
"reflect"
)

type Source interface{}

var DefaultValueGenerator = map[string]interface{}{
"type": "default",
}

func getSourceValue(s *Source, measurementName, itemKey string, itemDefaultValue interface{}) interface{} {
switch reflect.Indirect(reflect.ValueOf(s)).Elem().Kind() {
case reflect.Array:
array := (*s).([]interface{})
return array[rand.Int63n(int64(len(array)))]
case reflect.Map:
m := (*s).(map[string]interface{})
if reflect.DeepEqual(m, DefaultValueGenerator) {
return itemDefaultValue
}
log.Fatalf("generators are not supported (yet) ['%s/%s']", measurementName, itemKey)
default: // primitive types
return *s
}
panic("unreachable")
}

type Tag struct {
Name string
Source Source
}

type Field struct {
Count int
Name string
Source Source
}

type Measurement struct {
Name string
Sample float32
Tags []Tag
Fields []Field
}

type ExternalConfig struct {
measurements []Measurement
}

var Config *ExternalConfig

func (c *ExternalConfig) String() string {
var buf bytes.Buffer
for _,m := range c.measurements {
buf.WriteString(fmt.Sprintf("definition: %s, sample: %f\n", m.Name, m.Sample))
buf.WriteString(fmt.Sprintf(" tags:\n"))
for _,tag := range m.Tags {
buf.WriteString(fmt.Sprintf(" tag: %s\n", tag.Name))
buf.WriteString(fmt.Sprintf(" source: %v\n", tag.Source))
}
buf.WriteString(fmt.Sprintf(" fields:\n"))
for _,field := range m.Fields {
buf.WriteString(fmt.Sprintf(" field: %s, count: %d\n", field.Name, field.Count))
buf.WriteString(fmt.Sprintf(" source: %v\n", field.Source))
}
}
return buf.String()
}

func (c *ExternalConfig) GetTagBytesValue(measurementName, tagKey []byte, failIfNotFound bool, defaultValue []byte) []byte {
return []byte(c.GetTagValue(string(measurementName), string(tagKey), failIfNotFound, string(defaultValue)))
}

func (c *ExternalConfig) GetTagValue(measurementName, tagKey string, failIfNotFound bool, defaultValue string) string {
for _,m := range c.measurements {
if "" == measurementName || m.Name == measurementName {
for _,tag := range m.Tags {
if tag.Name == tagKey {
return fmt.Sprintf("%v", getSourceValue(&tag.Source, m.Name, tag.Name, defaultValue))
}
}
}
}
if failIfNotFound {
log.Fatalf("value for tag '%s/%s' not found", measurementName, tagKey)
}
return "" // defaultValue ?
}

func (c *ExternalConfig) GetFieldBytesValue(measurementName, tagKey []byte, failIfNotFound bool, defaultValue interface{}) interface{} {
return c.GetFieldValue(string(measurementName), string(tagKey), failIfNotFound, defaultValue)
}

func (c *ExternalConfig) GetFieldValue(measurementName, fieldKey string, failIfNotFound bool, defaultValue interface{}) interface{} {
for _,m := range c.measurements {
if "" == measurementName || m.Name == measurementName {
for _,field := range m.Fields {
if field.Name == fieldKey {
return getSourceValue(&field.Source, m.Name, field.Name, defaultValue)
}
}
}
}
if failIfNotFound {
log.Fatalf("value for field '%s/%s' not found", measurementName, fieldKey)
}
return nil // defaultValue ?
}

func NewConfig(path string) (*ExternalConfig, error) {
toml, err := toml.LoadFile(path)
if err != nil {
return nil, fmt.Errorf("file load failed: %v", err)
}
obj := toml.ToMap()["measurements"]
b, err := json.Marshal(obj)
if err != nil {
return nil, fmt.Errorf("marshall failed: %v", err)
}
config := ExternalConfig{}
err = json.Unmarshal(b, &config.measurements)
if err != nil {
return nil, fmt.Errorf("unmarshall failed: %v", err)
}
return &config, nil
}
13 changes: 6 additions & 7 deletions bulk_data_gen/dashboard/generate_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ func (d *DashboardSimulatorConfig) ToSimulator() *DashboardSimulator {

// Next advances a Point to the next state in the generator.
func (d *DashboardSimulator) Next(p *Point) {
// switch to the next metric if needed
if d.hostIndex == len(d.hosts) {
d.hostIndex = 0
d.simulatedMeasurementIndex++
}

// switch to the next host if needed
if d.simulatedMeasurementIndex == NHostSims {
d.simulatedMeasurementIndex = 0
d.hostIndex++
}

if d.hostIndex == len(d.hosts) {
d.hostIndex = 0
for i := 0; i < len(d.hosts); i++ {
d.hosts[i].TickAll(devops.EpochDuration)
}
Expand All @@ -108,7 +107,7 @@ func (d *DashboardSimulator) Next(p *Point) {
host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p)

d.madePoints++
d.hostIndex++
d.simulatedMeasurementIndex++
d.madeValues += int64(len(p.FieldValues))

return
Expand Down
13 changes: 13 additions & 0 deletions bulk_data_gen/dashboard/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ func NewHost(i int, offset int, start time.Time) Host {

SimulatedMeasurements: sm,
}

// partial override from external config
if Config != nil {
h.Region = Config.GetTagBytesValue(nil, devops.MachineTagKeys[1], true, []byte(h.Region))
h.Datacenter = Config.GetTagBytesValue(nil, devops.MachineTagKeys[2], true, []byte(h.Datacenter))
h.Rack = Config.GetTagBytesValue(nil, devops.MachineTagKeys[3], true, []byte(h.Rack))
h.OS = Config.GetTagBytesValue(nil, devops.MachineTagKeys[4], true, []byte(h.OS))
h.Arch = Config.GetTagBytesValue(nil, devops.MachineTagKeys[5], true, []byte(h.Arch))
h.Service = Config.GetTagBytesValue(nil, devops.MachineTagKeys[7], true, []byte(h.Service))
h.ServiceVersion = Config.GetTagBytesValue(nil, devops.MachineTagKeys[8], true, []byte(h.ServiceVersion))
h.ServiceEnvironment = Config.GetTagBytesValue(nil, devops.MachineTagKeys[9], true, []byte(h.ServiceEnvironment))
}

currentHostIndex++
return h
}
Expand Down
4 changes: 4 additions & 0 deletions bulk_data_gen/devops/devops_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewDiskMeasurement(start time.Time, sda int) *DiskMeasurement {
}
path := []byte(fmt.Sprintf("/dev/sda%d", sda))
fsType := DiskFSTypeChoices[rand.Intn(len(DiskFSTypeChoices))]
if Config != nil { // partial override from external config
path = Config.GetTagBytesValue(DiskByteString, DiskTags[0], true, path)
fsType = Config.GetTagBytesValue(DiskByteString, DiskTags[1], true, fsType)
}
return &DiskMeasurement{
path: path,
fsType: fsType,
Expand Down
3 changes: 3 additions & 0 deletions bulk_data_gen/devops/devops_diskio.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func NewDiskIOMeasurement(start time.Time) *DiskIOMeasurement {
}

serial := []byte(fmt.Sprintf("%03d-%03d-%03d", rand.Intn(1000), rand.Intn(1000), rand.Intn(1000)))
if Config != nil { // partial override from external config
serial = Config.GetTagBytesValue(DiskIOByteString, SerialByteString, true, serial)
}
return &DiskIOMeasurement{
serial: serial,

Expand Down
13 changes: 6 additions & 7 deletions bulk_data_gen/devops/devops_generate_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ func (d *DevopsSimulatorConfig) ToSimulator() *DevopsSimulator {

// Next advances a Point to the next state in the generator.
func (d *DevopsSimulator) Next(p *Point) {
// switch to the next metric if needed
if d.hostIndex == len(d.hosts) {
d.hostIndex = 0
d.simulatedMeasurementIndex++
}

// switch to the next host if needed
if d.simulatedMeasurementIndex == NHostSims {
d.simulatedMeasurementIndex = 0
d.hostIndex++
}

if d.hostIndex == len(d.hosts) {
d.hostIndex = 0
for i := 0; i < len(d.hosts); i++ {
d.hosts[i].TickAll(EpochDuration)
}
Expand All @@ -107,7 +106,7 @@ func (d *DevopsSimulator) Next(p *Point) {
host.SimulatedMeasurements[d.simulatedMeasurementIndex].ToPoint(p)

d.madePoints++
d.hostIndex++
d.simulatedMeasurementIndex++
d.madeValues += int64(len(p.FieldValues))

return
Expand Down
3 changes: 3 additions & 0 deletions bulk_data_gen/devops/devops_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func NewNetMeasurement(start time.Time) *NetMeasurement {
}

interfaceName := []byte(fmt.Sprintf("eth%d", rand.Intn(4)))
if Config != nil { // partial override from external config
interfaceName = Config.GetTagBytesValue(NetByteString, NetTags[0], true, interfaceName)
}
return &NetMeasurement{
interfaceName: interfaceName,

Expand Down
4 changes: 4 additions & 0 deletions bulk_data_gen/devops/devops_nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func NewNginxMeasurement(start time.Time) *NginxMeasurement {

serverName := []byte(fmt.Sprintf("nginx_%d", rand.Intn(100000)))
port := []byte(fmt.Sprintf("%d", rand.Intn(20000)+1024))
if Config != nil { // partial override from external config
serverName = Config.GetTagBytesValue(NginxByteString, NginxTags[1], true, serverName)
port = Config.GetTagBytesValue(NginxByteString, NginxTags[0], true, port)
}
return &NginxMeasurement{
port: port,
serverName: serverName,
Expand Down
4 changes: 4 additions & 0 deletions bulk_data_gen/devops/devops_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func NewRedisMeasurement(start time.Time) *RedisMeasurement {

serverName := []byte(fmt.Sprintf("redis_%d", rand.Intn(100000)))
port := []byte(fmt.Sprintf("%d", rand.Intn(20000)+1024))
if Config != nil { // partial override from external config
serverName = Config.GetTagBytesValue(RedisByteString, RedisTags[1], true, serverName)
port = Config.GetTagBytesValue(RedisByteString, RedisTags[0], true, port)
}
return &RedisMeasurement{
port: port,
serverName: serverName,
Expand Down
16 changes: 13 additions & 3 deletions cmd/bulk_data_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ var (
daemonUrl string
dbName string

format string
useCase string

format string
useCase string
configFile string
scaleVar int64
scaleVarOffset int64
samplingInterval time.Duration
Expand Down Expand Up @@ -67,6 +67,7 @@ func init() {
flag.Int64Var(&scaleVar, "scale-var", 1, "Scaling variable specific to the use case.")
flag.Int64Var(&scaleVarOffset, "scale-var-offset", 0, "Scaling variable offset specific to the use case.")
flag.DurationVar(&samplingInterval, "sampling-interval", devops.EpochDuration, "Simulated sampling interval.")
flag.StringVar(&configFile, "config-file", "", "Simulator config file in TOML format")

flag.StringVar(&timestampStartStr, "timestamp-start", common.DefaultDateTimeStart, "Beginning timestamp (RFC3339).")
flag.StringVar(&timestampEndStr, "timestamp-end", common.DefaultDateTimeEnd, "Ending timestamp (RFC3339).")
Expand Down Expand Up @@ -136,6 +137,15 @@ func main() {

common.Seed(seed)

if configFile != "" {
c, err := common.NewConfig(configFile)
if err != nil {
log.Fatalf("external config error: %v", err)
}
common.Config = c
log.Printf("Using config file %s\n", configFile)
}

out := bufio.NewWriterSize(os.Stdout, 4<<20)
defer out.Flush()

Expand Down
Loading

0 comments on commit 561af60

Please sign in to comment.