Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
小滋润 committed Nov 8, 2024
1 parent 856196b commit 1692d15
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 237 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/develop-CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Develop CI
on:
workflow_dispatch:
push:
branches: [ "develop" ]
branches: [ "develop" ,"test"]

jobs:
test-and-update-docs:
Expand Down
115 changes: 78 additions & 37 deletions global/nats/manager/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,107 @@ package manager
import (
"context"
"errors"
"runtime/debug"
"strings"
"time"

"github.com/ZiRunHua/LeapLedger/util/dataTool"
"github.com/nats-io/nats.go/jetstream"
"go.uber.org/zap"
)

// consumerManger is used to manage the consumer group in the stream
// It records the consumption method of the consumption group,
// which will help with the retry of messages in the dead letter queue
type consumerManger struct {
stream jetstream.Stream
consumer jetstream.Consumer
consumerMessageHandler dataTool.Map[string, func(msg jetstream.Msg) error]
}
type (
// ConsumerManger is used to manage the consumer group in the stream
// It records the consumption method of the consumption group,
// which will help with the retry of messages in the dead letter queue
ConsumerManger interface {
NewConsumer(context.Context, func(*jetstream.ConsumerConfig) error, consumerMessageHandler) (
jetstream.Consumer, error,
)
Consume(context.Context, jetstream.Consumer, consumerMessageHandler) error
ReConsume(ctx context.Context, consumerName string, msg *jetstream.RawStreamMsg) error
UpdateAllConsumerConfig(func(*jetstream.ConsumerConfig) error, context.Context) error
}

func (cm *consumerManger) init(
ctx context.Context,
stream jetstream.Stream,
mainConsumer jetstream.ConsumerConfig) error {
cm.stream, cm.consumerMessageHandler = stream, dataTool.NewSyncMap[string, func(msg jetstream.Msg) error]()
_, err := cm.createOrUpdateConsumer(ctx, mainConsumer)
if err != nil {
return err
consumerManger struct {
stream jetstream.Stream
// consumer is main consumer,other consumers will be based on the configuration of this consumer
consumer jetstream.Consumer
consumerMessageHandler dataTool.Map[string, consumerMessageHandler]

logger *zap.Logger
}
cm.consumerMessageHandler.Store(mainConsumer.Name, func(msg jetstream.Msg) error { return nil })
return nil
consumerMessageHandler func(subject string, payload []byte) error
)

func NewConsumerManger(ctx context.Context, stream jetstream.Stream, consumer jetstream.Consumer, logger *zap.Logger) (
ConsumerManger, error,
) {
var cm consumerManger
cm.stream, cm.consumer, cm.logger = stream, consumer, logger
cm.consumerMessageHandler = dataTool.NewSyncMap[string, consumerMessageHandler]()
return &cm, cm.Consume(ctx, cm.consumer, func(_ string, _ []byte) error { return nil })
}

func (cm *consumerManger) reConsume(_ context.Context, consumerName string, msg jetstream.Msg) error {
func (cm *consumerManger) ReConsume(_ context.Context, consumerName string, msg *jetstream.RawStreamMsg) error {
handler, exist := cm.consumerMessageHandler.Load(consumerName)
if !exist {
return errors.New("consumer message handler not found")
}
return handler(msg)
return handler(msg.Subject, msg.Data)
}

func (cm *consumerManger) createOrUpdateConsumer(ctx context.Context, config jetstream.ConsumerConfig) (
consumer jetstream.Consumer, err error) {
consumer jetstream.Consumer, err error,
) {
return cm.stream.CreateOrUpdateConsumer(ctx, config)
}

func (cm *consumerManger) consume(
ctx context.Context, consumer jetstream.Consumer, handle jetstream.MessageHandler,
func (cm *consumerManger) Consume(
ctx context.Context, consumer jetstream.Consumer, handle consumerMessageHandler,
) (err error) {
info, err := consumer.Info(ctx)
if err != nil {
return err
}
_, err = consumer.Consume(handle)
_, err = consumer.Consume(cm.ReceiveMsg(handle))
if err != nil {
return err
}
cm.consumerMessageHandler.Store(info.Name, handle)
return nil
}

func (cm *consumerManger) newConsumer(ctx context.Context, setConfig func(*jetstream.ConsumerConfig) error) (
jetstream.Consumer, error,
) {
func (cm *consumerManger) ReceiveMsg(handle consumerMessageHandler) jetstream.MessageHandler {
return func(msg jetstream.Msg) {
var err error
defer func() {
if r := recover(); r != nil {
cm.logger.Panic(
msg.Subject(), zap.ByteString("data", msg.Data()),
zap.Any("panic", r), zap.Stack(string(debug.Stack())),
)
return
}
if err == nil {
err = msg.Ack()
}
if err != nil {
cm.logger.Error(
msg.Subject(), zap.ByteString("data", msg.Data()),
zap.Error(err), zap.Stack(string(debug.Stack())),
)
return
}
}()
err = handle(msg.Subject(), msg.Data())
}
}

func (cm *consumerManger) NewConsumer(
ctx context.Context,
setConfig func(*jetstream.ConsumerConfig) error,
handler consumerMessageHandler) (jetstream.Consumer, error) {
info, err := cm.consumer.Info(ctx)
if err != nil {
return nil, err
Expand All @@ -76,21 +117,21 @@ func (cm *consumerManger) newConsumer(ctx context.Context, setConfig func(*jetst
strings.Compare(config.Durable, info.Config.Durable) == 0 {
return nil, errors.New("new consumer has the same name as the main consumer")
}

consumer, err := cm.createOrUpdateConsumer(ctx, config)
if err != nil {
return nil, err
}
cm.consumerMessageHandler.Store(config.Name, func(msg jetstream.Msg) {})
if handler == nil {
handler = func(_ string, _ []byte) error { return nil }
}
_, err = consumer.Consume(cm.ReceiveMsg(handler))
if err != nil {
return nil, err
}
cm.consumerMessageHandler.Store(info.Name, handler)
return consumer, nil
}

func (cm *consumerManger) getConsumerMsgHandle(_ context.Context, name string) (
jetstream.MessageHandler, bool,
) {
return cm.consumerMessageHandler.Load(name)
}

func (cm *consumerManger) iterateConsumers(
ctx context.Context,
) (func(yield func(*jetstream.ConsumerInfo) bool), error) {
Expand All @@ -107,15 +148,15 @@ func (cm *consumerManger) iterateConsumers(
}, nil
}

func (cm *consumerManger) updateAllConsumerConfig(
func (cm *consumerManger) UpdateAllConsumerConfig(
handle func(*jetstream.ConsumerConfig) error, ctx context.Context,
) error {
consumers, err := cm.iterateConsumers(ctx)
if err != nil {
return err
}
for info := range consumers {
err = handle(info)
err = handle(&info.Config)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 1692d15

Please sign in to comment.