Skip to content

Commit

Permalink
new release
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Nov 21, 2019
1 parent d13c356 commit 9b9bf6e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docker/release/.tag
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rwynn/mongofluxd:1.1.1
rwynn/mongofluxd:1.2.0
2 changes: 1 addition & 1 deletion docker/release/.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.1
1.2.0
39 changes: 23 additions & 16 deletions mongofluxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var errorLog *log.Logger = log.New(os.Stdout, "ERROR ", log.Flags())

const (
Name = "mongofluxd"
Version = "1.1.1"
Version = "1.2.0"
mongoUrlDefault = "mongodb://localhost:27017"
influxUrlDefault = "http://localhost:8086"
influxClientsDefault = 10
Expand Down Expand Up @@ -253,7 +253,7 @@ func (ctx *InfluxCtx) setupMeasurements() error {
}
return nil
} else {
return fmt.Errorf("at least one measurement is required")
return fmt.Errorf("At least one measurement is required")
}
}

Expand Down Expand Up @@ -309,9 +309,6 @@ func (ctx *InfluxCtx) writeBatch() (err error) {
}
}
ctx.m = make(map[string]client.BatchPoints)
if err == nil {
err = ctx.saveTs()
}
return
}

Expand Down Expand Up @@ -637,19 +634,19 @@ func (config *configOptions) LoadPlugin() *configOptions {
}
p, err := plugin.Open(config.PluginPath)
if err != nil {
errorLog.Panicf("Unable to load plugin <%s>: %s", config.PluginPath, err)
errorLog.Fatalf("Unable to load plugin <%s>: %s", config.PluginPath, err)
}
for _, m := range config.Measurement {
if m.Symbol != "" {
f, err := p.Lookup(m.Symbol)
if err != nil {
errorLog.Panicf("Unable to lookup symbol <%s> for plugin <%s>: %s", m.Symbol, config.PluginPath, err)
errorLog.Fatalf("Unable to lookup symbol <%s> for plugin <%s>: %s", m.Symbol, config.PluginPath, err)
}
switch f.(type) {
case func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error):
m.plug = f.(func(*mongofluxdplug.MongoDocument) ([]*mongofluxdplug.InfluxPoint, error))
default:
errorLog.Panicf("Plugin symbol <%s> must be typed %T", m.Symbol, m.plug)
errorLog.Fatalf("Plugin symbol <%s> must be typed %T", m.Symbol, m.plug)
}
}
}
Expand Down Expand Up @@ -868,7 +865,7 @@ func main() {
config.LoadConfigFile().SetDefaults().LoadPlugin()

if len(config.Measurement) == 0 {
errorLog.Panicf("at least one measurement is required")
errorLog.Fatalf("at least one measurement is required")
}

sigs := make(chan os.Signal, 1)
Expand All @@ -878,7 +875,7 @@ func main() {

mongoClient, err := config.DialMongo()
if err != nil {
errorLog.Panicf("Unable to connect to mongodb using URL %s: %s",
errorLog.Fatalf("Unable to connect to mongodb using URL %s: %s",
cleanMongoURL(config.MongoURL), err)
}

Expand Down Expand Up @@ -953,7 +950,7 @@ func main() {
filter = gtm.ChainOpFilters(filterChain...)
gtmBufferDuration, err := time.ParseDuration(config.GtmSettings.BufferDuration)
if err != nil {
errorLog.Panicf("Unable to parse gtm buffer duration %s: %s", config.GtmSettings.BufferDuration, err)
errorLog.Fatalf("Unable to parse gtm buffer duration %s: %s", config.GtmSettings.BufferDuration, err)
}
httpConfig := client.HTTPConfig{
UserAgent: fmt.Sprintf("%s v%s", Name, Version),
Expand All @@ -965,13 +962,13 @@ func main() {
if config.InfluxPemFile != "" {
tlsConfig, err := config.InfluxTLS()
if err != nil {
errorLog.Panicf("Unable to configure TLS for InfluxDB: %s", err)
errorLog.Fatalf("Unable to configure TLS for InfluxDB: %s", err)
}
httpConfig.TLSConfig = tlsConfig
}
influxClient, err := client.NewHTTPClient(httpConfig)
if err != nil {
errorLog.Panicf("Unable to create InfluxDB client: %s", err)
errorLog.Fatalf("Unable to create InfluxDB client: %s", err)
}
var directReadNs, changeStreamNs []string
if config.DirectReads {
Expand Down Expand Up @@ -1011,6 +1008,8 @@ func main() {
defer wg.Done()
flusher := time.NewTicker(1 * time.Second)
defer flusher.Stop()
progress := time.NewTicker(10 * time.Second)
defer progress.Stop()
influx := &InfluxCtx{
c: influxClient,
m: make(map[string]client.BatchPoints),
Expand All @@ -1021,13 +1020,19 @@ func main() {
tokens: bson.M{},
}
if err := influx.setupMeasurements(); err != nil {
errorLog.Panicf("Configuration error: %s", err)
errorLog.Fatalf("Configuration error: %s", err)
}
for {
select {
case <-progress.C:
if err := influx.saveTs(); err != nil {
exitStatus = 1
errorLog.Println(err)
}
case <-flusher.C:
if err := influx.writeBatch(); err != nil {
gtmCtx.ErrC <- err
exitStatus = 1
errorLog.Println(err)
}
case err = <-gtmCtx.ErrC:
if err == nil {
Expand All @@ -1042,12 +1047,14 @@ func main() {
exitStatus = 1
errorLog.Println(err)
}
influx.saveTs()
return
}
break
}
if err := influx.addPoint(op); err != nil {
gtmCtx.ErrC <- err
exitStatus = 1
errorLog.Println(err)
}
}
}
Expand Down

0 comments on commit 9b9bf6e

Please sign in to comment.