diff --git a/docker/release/.tag b/docker/release/.tag index 2da577b..9eea834 100644 --- a/docker/release/.tag +++ b/docker/release/.tag @@ -1 +1 @@ -rwynn/mongofluxd:1.1.1 +rwynn/mongofluxd:1.2.0 diff --git a/docker/release/.version b/docker/release/.version index 524cb55..26aaba0 100644 --- a/docker/release/.version +++ b/docker/release/.version @@ -1 +1 @@ -1.1.1 +1.2.0 diff --git a/mongofluxd.go b/mongofluxd.go index 019323f..1819a57 100644 --- a/mongofluxd.go +++ b/mongofluxd.go @@ -38,7 +38,7 @@ var errorLog *log.Logger = log.New(os.Stdout, "ERROR ", log.Flags()) const ( Name = "mongofluxd" - Version = "1.1.1" + Version = "1.2.0" mongoUrlDefault = "mongodb://localhost:27017" influxUrlDefault = "http://localhost:8086" influxClientsDefault = 10 @@ -253,7 +253,7 @@ func (ctx *InfluxCtx) setupMeasurements() error { } return nil } else { - return fmt.Errorf("at least one measurement is required") + return fmt.Errorf("At least one measurement is required") } } @@ -309,9 +309,6 @@ func (ctx *InfluxCtx) writeBatch() (err error) { } } ctx.m = make(map[string]client.BatchPoints) - if err == nil { - err = ctx.saveTs() - } return } @@ -637,19 +634,19 @@ func (config *configOptions) LoadPlugin() *configOptions { } p, err := plugin.Open(config.PluginPath) if err != nil { - errorLog.Panicf("Unable to load plugin <%s>: %s", config.PluginPath, err) + errorLog.Fatalf("Unable to load plugin <%s>: %s", config.PluginPath, err) } for _, m := range config.Measurement { if m.Symbol != "" { f, err := p.Lookup(m.Symbol) if err != nil { - errorLog.Panicf("Unable to lookup symbol <%s> for plugin <%s>: %s", m.Symbol, config.PluginPath, err) + errorLog.Fatalf("Unable to lookup symbol <%s> for plugin <%s>: %s", m.Symbol, config.PluginPath, err) } switch f.(type) { case func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error): m.plug = f.(func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error)) default: - errorLog.Panicf("Plugin symbol <%s> must be typed %T", m.Symbol, m.plug) + errorLog.Fatalf("Plugin symbol <%s> must be typed %T", m.Symbol, m.plug) } } } @@ -868,7 +865,7 @@ func main() { config.LoadConfigFile().SetDefaults().LoadPlugin() if len(config.Measurement) == 0 { - errorLog.Panicf("at least one measurement is required") + errorLog.Fatalf("at least one measurement is required") } sigs := make(chan os.Signal, 1) @@ -878,7 +875,7 @@ func main() { mongoClient, err := config.DialMongo() if err != nil { - errorLog.Panicf("Unable to connect to mongodb using URL %s: %s", + errorLog.Fatalf("Unable to connect to mongodb using URL %s: %s", cleanMongoURL(config.MongoURL), err) } @@ -953,7 +950,7 @@ func main() { filter = gtm.ChainOpFilters(filterChain...) gtmBufferDuration, err := time.ParseDuration(config.GtmSettings.BufferDuration) if err != nil { - errorLog.Panicf("Unable to parse gtm buffer duration %s: %s", config.GtmSettings.BufferDuration, err) + errorLog.Fatalf("Unable to parse gtm buffer duration %s: %s", config.GtmSettings.BufferDuration, err) } httpConfig := client.HTTPConfig{ UserAgent: fmt.Sprintf("%s v%s", Name, Version), @@ -965,13 +962,13 @@ func main() { if config.InfluxPemFile != "" { tlsConfig, err := config.InfluxTLS() if err != nil { - errorLog.Panicf("Unable to configure TLS for InfluxDB: %s", err) + errorLog.Fatalf("Unable to configure TLS for InfluxDB: %s", err) } httpConfig.TLSConfig = tlsConfig } influxClient, err := client.NewHTTPClient(httpConfig) if err != nil { - errorLog.Panicf("Unable to create InfluxDB client: %s", err) + errorLog.Fatalf("Unable to create InfluxDB client: %s", err) } var directReadNs, changeStreamNs []string if config.DirectReads { @@ -1011,6 +1008,8 @@ func main() { defer wg.Done() flusher := time.NewTicker(1 * time.Second) defer flusher.Stop() + progress := time.NewTicker(10 * time.Second) + defer progress.Stop() influx := &InfluxCtx{ c: influxClient, m: make(map[string]client.BatchPoints), @@ -1021,13 +1020,19 @@ func main() { tokens: bson.M{}, } if err := influx.setupMeasurements(); err != nil { - errorLog.Panicf("Configuration error: %s", err) + errorLog.Fatalf("Configuration error: %s", err) } for { select { + case <-progress.C: + if err := influx.saveTs(); err != nil { + exitStatus = 1 + errorLog.Println(err) + } case <-flusher.C: if err := influx.writeBatch(); err != nil { - gtmCtx.ErrC <- err + exitStatus = 1 + errorLog.Println(err) } case err = <-gtmCtx.ErrC: if err == nil { @@ -1042,12 +1047,14 @@ func main() { exitStatus = 1 errorLog.Println(err) } + influx.saveTs() return } break } if err := influx.addPoint(op); err != nil { - gtmCtx.ErrC <- err + exitStatus = 1 + errorLog.Println(err) } } }