Skip to content

Commit

Permalink
Excessive logging (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sotirios Mantziaris authored Jun 28, 2021
1 parent 9754c3b commit 52310d1
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 47 deletions.
10 changes: 5 additions & 5 deletions client/kafka/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (ab *Builder) WithTimeout(dial time.Duration) *Builder {
return ab
}
ab.cfg.Net.DialTimeout = dial
log.Infof(fieldSetMsg, "dial timeout", dial)
log.Debugf(fieldSetMsg, "dial timeout", dial)
return ab
}

Expand All @@ -88,7 +88,7 @@ func (ab *Builder) WithVersion(version string) *Builder {
ab.errors = append(ab.errors, errors.New("failed to parse kafka version"))
return ab
}
log.Infof(fieldSetMsg, "version", version)
log.Debugf(fieldSetMsg, "version", version)
ab.cfg.Version = v

return ab
Expand All @@ -101,7 +101,7 @@ func (ab *Builder) WithRequiredAcksPolicy(ack RequiredAcks) *Builder {
ab.errors = append(ab.errors, errors.New("invalid value for required acks policy provided"))
return ab
}
log.Infof(fieldSetMsg, "required acks", ack)
log.Debugf(fieldSetMsg, "required acks", ack)
ab.cfg.Producer.RequiredAcks = sarama.RequiredAcks(ack)
return ab
}
Expand All @@ -112,13 +112,13 @@ func (ab *Builder) WithEncoder(enc encoding.EncodeFunc, contentType string) *Bui
if enc == nil {
ab.errors = append(ab.errors, errors.New("encoder is nil"))
} else {
log.Infof(fieldSetMsg, "encoder", enc)
log.Debugf(fieldSetMsg, "encoder", enc)
ab.enc = enc
}
if contentType == "" {
ab.errors = append(ab.errors, errors.New("content type is empty"))
} else {
log.Infof(fieldSetMsg, "content type", contentType)
log.Debugf(fieldSetMsg, "content type", contentType)
ab.contentType = contentType
}

Expand Down
2 changes: 1 addition & 1 deletion component/amqp/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (c *Component) subscribe() (subscription, error) {
sub.channel = ch

tag := uuid.New().String()
log.Infof("consuming messages for tag %s", tag)
log.Debugf("consuming messages for tag %s", tag)

deliveries, err := ch.Consume(c.queueCfg.queue, tag, false, false, false, false, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion component/async/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (c *consumer) consume() (<-chan amqp.Delivery, error) {
c.ch = ch

c.tag = uuid.New().String()
log.Infof("consuming messages for tag %s", c.tag)
log.Debugf("consuming messages for tag %s", c.tag)

err = ch.ExchangeDeclare(c.exchange.name, c.exchange.kind, true, false, false, false, nil)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions component/async/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (cb *Builder) WithFailureStrategy(fs FailStrategy) *Builder {
if fs > AckStrategy || fs < NackExitStrategy {
cb.errors = append(cb.errors, errors.New("invalid strategy provided"))
} else {
log.Infof(propSetMSG, "failure strategy", cb.name)
log.Debugf(propSetMSG, "failure strategy", cb.name)
cb.failStrategy = fs
}
return cb
Expand All @@ -94,7 +94,7 @@ func (cb *Builder) WithFailureStrategy(fs FailStrategy) *Builder {
// WithRetries specifies the retry events number for the component
// default value is '0'.
func (cb *Builder) WithRetries(retries uint) *Builder {
log.Infof(propSetMSG, "retries", cb.name)
log.Debugf(propSetMSG, "retries", cb.name)
cb.retries = retries
return cb
}
Expand All @@ -103,7 +103,7 @@ func (cb *Builder) WithRetries(retries uint) *Builder {
// default value is '1'
// do NOT enable concurrency value for in-order consumers, such as Kafka or FIFO SQS
func (cb *Builder) WithConcurrency(concurrency uint) *Builder {
log.Infof(propSetMSG, "concurrency", cb.name)
log.Debugf(propSetMSG, "concurrency", cb.name)
cb.concurrency = concurrency
return cb
}
Expand All @@ -115,7 +115,7 @@ func (cb *Builder) WithRetryWait(retryWait time.Duration) *Builder {
if retryWait < 0 {
cb.errors = append(cb.errors, errors.New("invalid retry wait provided"))
} else {
log.Infof(propSetMSG, "retryWait", cb.name)
log.Debugf(propSetMSG, "retryWait", cb.name)
cb.retryWait = retryWait
}
return cb
Expand Down
2 changes: 1 addition & 1 deletion component/async/kafka/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
return nil, nil, fmt.Errorf("failed to create consumer: %w", err)
}
c.cg = cg
log.Infof("consuming messages from topics '%s' using group '%s'", strings.Join(c.topics, ","), c.group)
log.Debugf("consuming messages from topics '%s' using group '%s'", strings.Join(c.topics, ","), c.group)

chMsg := make(chan async.Message, c.config.Buffer)
chErr := make(chan error, c.config.Buffer)
Expand Down
5 changes: 1 addition & 4 deletions component/async/kafka/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type Factory struct {

// New constructor.
func New(name, topic string, brokers []string, oo ...kafka.OptionFunc) (*Factory, error) {

if name == "" {
return nil, errors.New("name is required")
}
Expand All @@ -66,7 +65,6 @@ func (c *consumer) OutOfOrder() bool {

// Create a new consumer.
func (f *Factory) Create() (async.Consumer, error) {

config, err := kafka.DefaultSaramaConfig(f.name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +122,7 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
chMsg := make(chan async.Message, c.config.Buffer)
chErr := make(chan error, c.config.Buffer)

log.Infof("consuming messages from topic '%s' without using consumer group", c.topic)
log.Debugf("consuming messages from topic '%s' without using consumer group", c.topic)
var pcs []sarama.PartitionConsumer

pcs, err := c.partitions(ctx)
Expand Down Expand Up @@ -170,7 +168,6 @@ func (c *consumer) Consume(ctx context.Context) (<-chan async.Message, <-chan er
}

func (c *consumer) partitionsFromOffset(_ context.Context) ([]sarama.PartitionConsumer, error) {

ms, err := sarama.NewConsumer(c.config.Brokers, c.config.SaramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create simple consumer: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion component/grpc/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c *Component) Run(ctx context.Context) error {
c.srv.GracefulStop()
}()

log.Infof("gRPC component listening on port %d", c.port)
log.Debugf("gRPC component listening on port %d", c.port)
return c.srv.Serve(lis)
}

Expand Down
24 changes: 12 additions & 12 deletions component/http/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *Component) Run(ctx context.Context) error {

select {
case <-ctx.Done():
log.Info("shutting down component")
log.Info("shutting down HTTP component")
tctx, cancel := context.WithTimeout(context.Background(), c.shutdownGracePeriod)
defer cancel()
return srv.Shutdown(tctx)
Expand All @@ -68,11 +68,11 @@ func (c *Component) Run(ctx context.Context) error {

func (c *Component) listenAndServe(srv *http.Server, ch chan<- error) {
if c.certFile != "" && c.keyFile != "" {
log.Infof("HTTPS component listening on port %d", c.httpPort)
log.Debugf("HTTPS component listening on port %d", c.httpPort)
ch <- srv.ListenAndServeTLS(c.certFile, c.keyFile)
}

log.Infof("HTTP component listening on port %d", c.httpPort)
log.Debugf("HTTP component listening on port %d", c.httpPort)
ch <- srv.ListenAndServe()
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func (cb *Builder) WithSSL(c, k string) *Builder {
if c == "" || k == "" {
cb.errors = append(cb.errors, errors.New("invalid cert or key provided"))
} else {
log.Info("setting cert file and key")
log.Debug("setting cert file and key")
cb.certFile = c
cb.keyFile = k
}
Expand All @@ -158,7 +158,7 @@ func (cb *Builder) WithRoutesBuilder(rb *RoutesBuilder) *Builder {
if rb == nil {
cb.errors = append(cb.errors, errors.New("route builder is nil"))
} else {
log.Info("setting route builder")
log.Debug("setting route builder")
cb.routesBuilder = rb
}
return cb
Expand All @@ -169,7 +169,7 @@ func (cb *Builder) WithMiddlewares(mm ...MiddlewareFunc) *Builder {
if len(mm) == 0 {
cb.errors = append(cb.errors, errors.New("empty list of middlewares provided"))
} else {
log.Info("setting middlewares")
log.Debug("setting middlewares")
cb.middlewares = append(cb.middlewares, mm...)
}

Expand All @@ -181,7 +181,7 @@ func (cb *Builder) WithReadTimeout(rt time.Duration) *Builder {
if rt <= 0*time.Second {
cb.errors = append(cb.errors, errors.New("negative or zero read timeout provided"))
} else {
log.Infof("setting read timeout")
log.Debug("setting read timeout")
cb.httpReadTimeout = rt
}

Expand All @@ -193,7 +193,7 @@ func (cb *Builder) WithWriteTimeout(wt time.Duration) *Builder {
if wt <= 0*time.Second {
cb.errors = append(cb.errors, errors.New("negative or zero write timeout provided"))
} else {
log.Infof("setting write timeout")
log.Debug("setting write timeout")
cb.httpWriteTimeout = wt
}

Expand Down Expand Up @@ -234,7 +234,7 @@ func (cb *Builder) WithShutdownGracePeriod(gp time.Duration) *Builder {
if gp <= 0*time.Second {
cb.errors = append(cb.errors, errors.New("negative or zero shutdown grace period provided"))
} else {
log.Infof("setting shutdown grace period")
log.Debug("setting shutdown grace period")
cb.shutdownGracePeriod = gp
}

Expand All @@ -246,7 +246,7 @@ func (cb *Builder) WithPort(p int) *Builder {
if p <= 0 || p > 65535 {
cb.errors = append(cb.errors, errors.New("invalid HTTP Port provided"))
} else {
log.Infof("setting port")
log.Debug("setting port")
cb.httpPort = p
}

Expand All @@ -258,7 +258,7 @@ func (cb *Builder) WithAliveCheckFunc(acf AliveCheckFunc) *Builder {
if acf == nil {
cb.errors = append(cb.errors, errors.New("nil AliveCheckFunc was provided"))
} else {
log.Infof("setting aliveness check")
log.Debug("setting aliveness check")
cb.ac = acf
}

Expand All @@ -270,7 +270,7 @@ func (cb *Builder) WithReadyCheckFunc(rcf ReadyCheckFunc) *Builder {
if rcf == nil {
cb.errors = append(cb.errors, errors.New("nil ReadyCheckFunc provided"))
} else {
log.Infof("setting readiness check")
log.Debug("setting readiness check")
cb.rc = rcf
}

Expand Down
3 changes: 1 addition & 2 deletions component/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func (w *responseWriter) Header() http.Header {

// Write to the internal responseWriter and sets the status if not set already.
func (w *responseWriter) Write(d []byte) (int, error) {

value, err := w.writer.Write(d)
if err != nil {
return value, err
Expand Down Expand Up @@ -154,7 +153,7 @@ func NewRateLimitingMiddleware(limiter *rate.Limiter) MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
log.Info("Limiting requests...")
log.Debug("Limiting requests...")
http.Error(w, "Requests greater than limit", http.StatusTooManyRequests)
return
}
Expand Down
2 changes: 1 addition & 1 deletion component/kafka/group/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *Component) processing(ctx context.Context) error {
}

if client != nil {
log.Infof("consuming messages from topics '%#v' using group '%s'", c.topics, c.group)
log.Debugf("consuming messages from topics '%#v' using group '%s'", c.topics, c.group)
for {
// check if context was cancelled or deadline exceeded, signaling that the consumer should stop
if ctx.Err() != nil {
Expand Down
1 change: 1 addition & 0 deletions examples/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func main() {
log.Fatalf("failed to create and run service %v", err)
}
}

func getHandler(_ context.Context, _ *patronhttp.Request) (*patronhttp.Response, error) {
return patronhttp.NewResponse(fmt.Sprint("Testing Middleware", http.StatusOK)), nil
}
Expand Down
2 changes: 1 addition & 1 deletion examples/sqs-simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func sampleSqs() (*async.Component, error) {
}

func messageHandler(message async.Message) error {
log.Info("Received message, payload:", string(message.Payload()))
log.Infof("Received message, payload: %s", string(message.Payload()))
time.Sleep(3 * time.Second) // useful to see concurrency in action
return nil
}
Loading

0 comments on commit 52310d1

Please sign in to comment.