Skip to content

Commit

Permalink
fix pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzyildirim committed Aug 20, 2023
1 parent b4f0bf6 commit 96dfed1
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 99 deletions.
2 changes: 1 addition & 1 deletion batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

if cfg.APIEnabled {
c.base.setupAPI(cfg, *c.metric, c.base.cronsumer.GetMetricCollectors()...)
c.base.setupAPI(cfg, c.metric, c.base.cronsumer.GetMetricCollectors()...)
}

return &c, nil
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

if cfg.APIEnabled {
c.base.setupAPI(cfg, *c.metric, c.base.cronsumer.GetMetricCollectors()...)
c.base.setupAPI(cfg, c.metric, c.base.cronsumer.GetMetricCollectors()...)
}

return &c, nil
Expand Down
4 changes: 2 additions & 2 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa
c.subprocesses.Add(c.cronsumer)
}

func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric ConsumerMetric, metricCollectors ...prometheus.Collector) {
func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) {
c.logger.Debug("Initializing API")
c.api = NewAPI(cfg, &consumerMetric, metricCollectors...)
c.api = NewAPI(cfg, consumerMetric, metricCollectors...)
c.subprocesses.Add(c.api)
}

Expand Down
190 changes: 95 additions & 95 deletions examples/with-grafana/main.go
Original file line number Diff line number Diff line change
@@ -1,95 +1,95 @@
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"strconv"
"time"

"github.com/Trendyol/kafka-konsumer"
)

type user struct {
Name string
ID int
}

var messages = []user{
{ID: 1, Name: "foo"},
{ID: 2, Name: "bar"},
{ID: 3, Name: "baz"},
{ID: 4, Name: "qux"},
{ID: 5, Name: "fred"},
}

func main() {
// create new kafka producer
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
defer producer.Close()

go func() {
// produce messages at 1 seconds interval
i := 0
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
if i == len(messages) {
break
}
message := messages[i]
bytes, _ := json.Marshal(message)

_ = producer.Produce(context.Background(), kafka.Message{
Topic: "konsumer",
Key: []byte(strconv.Itoa(message.ID)),
Value: bytes,
})
i++
}
}()

consumerCfg := &kafka.ConsumerConfig{
APIEnabled: true,
Concurrency: 1,
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "konsumer",
GroupID: "konsumer.group.test",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "konsumer-retry",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: func(message kafka.Message) error {
// mocking some background task
time.Sleep(1 * time.Second)

fmt.Printf("Message from %s with value %s is consumed successfully\n", message.Topic, string(message.Value))
return nil
},
}

// create new kafka consumer
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

// start consumer
consumer.Consume()

fmt.Println("Consumer started!")

// wait for interrupt signal to gracefully shut down the consumer
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}
package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/signal"
"strconv"
"time"

"github.com/Trendyol/kafka-konsumer"
)

type user struct {
Name string
ID int
}

var messages = []user{
{ID: 1, Name: "foo"},
{ID: 2, Name: "bar"},
{ID: 3, Name: "baz"},
{ID: 4, Name: "qux"},
{ID: 5, Name: "fred"},
}

func main() {
// create new kafka producer
producer, _ := kafka.NewProducer(kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
defer producer.Close()

go func() {
// produce messages at 1 seconds interval
i := 0
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
if i == len(messages) {
break
}
message := messages[i]
bytes, _ := json.Marshal(message)

_ = producer.Produce(context.Background(), kafka.Message{
Topic: "konsumer",
Key: []byte(strconv.Itoa(message.ID)),
Value: bytes,
})
i++
}
}()

consumerCfg := &kafka.ConsumerConfig{
APIEnabled: true,
Concurrency: 1,
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "konsumer",
GroupID: "konsumer.group.test",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "konsumer-retry",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: func(message kafka.Message) error {
// mocking some background task
time.Sleep(1 * time.Second)

fmt.Printf("Message from %s with value %s is consumed successfully\n", message.Topic, string(message.Value))
return nil
},
}

// create new kafka consumer
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

// start consumer
consumer.Consume()

fmt.Println("Consumer started!")

// wait for interrupt signal to gracefully shut down the consumer
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

0 comments on commit 96dfed1

Please sign in to comment.