Skip to content

Commit

Permalink
enhance: implement kafka for wal (milvus-io#38598)
Browse files Browse the repository at this point in the history
issue: milvus-io#38399

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored and NicoYuan1986 committed Dec 26, 2024
1 parent 5032a3c commit f3de27d
Show file tree
Hide file tree
Showing 23 changed files with 614 additions and 41 deletions.
1 change: 1 addition & 0 deletions internal/streamingnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *scannerAdaptorImpl) executeConsume() {
Message: s.pendingQueue.Next(),
})
if handleResult.Error != nil {
s.Finish(err)
s.Finish(handleResult.Error)
return
}
if handleResult.MessageHandled {
Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (kc *kafkaClient) Subscribe(ctx context.Context, options mqwrapper.Consumer
}

func (kc *kafkaClient) EarliestMessageID() common.MessageID {
return &kafkaID{messageID: int64(kafka.OffsetBeginning)}
return &KafkaID{MessageID: int64(kafka.OffsetBeginning)}
}

func (kc *kafkaClient) StringToMsgID(id string) (common.MessageID, error) {
Expand All @@ -250,7 +250,7 @@ func (kc *kafkaClient) StringToMsgID(id string) (common.MessageID, error) {
return nil, err
}

return &kafkaID{messageID: offset}, nil
return &KafkaID{MessageID: offset}, nil
}

func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafka.ConfigMap) {
Expand All @@ -265,7 +265,7 @@ func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafk

func (kc *kafkaClient) BytesToMsgID(id []byte) (common.MessageID, error) {
offset := DeserializeKafkaID(id)
return &kafkaID{messageID: offset}, nil
return &KafkaID{MessageID: offset}, nil
}

func (kc *kafkaClient) Close() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) {
Consume1(ctx1, t, kc, topic, subName, c, &total1)

lastMsgID := <-c
log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*kafkaID).messageID))
log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*KafkaID).MessageID))

ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second)
Consume2(ctx2, t, kc, topic, subName, lastMsgID, &total2)
Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newKafkaConsumer(config *kafka.ConfigMap, bufSize int64, topic string, grou
return nil, err
}
} else {
offset = kafka.Offset(latestMsgID.(*kafkaID).messageID)
offset = kafka.Offset(latestMsgID.(*KafkaID).MessageID)
kc.skipMsg = true
}
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (kc *Consumer) Seek(id common.MessageID, inclusive bool) error {
return errors.New("kafka consumer is already assigned, can not seek again")
}

offset := kafka.Offset(id.(*kafkaID).messageID)
offset := kafka.Offset(id.(*KafkaID).MessageID)
return kc.internalSeek(offset, inclusive)
}

Expand Down Expand Up @@ -219,7 +219,7 @@ func (kc *Consumer) GetLatestMsgID() (common.MessageID, error) {
}

log.Info("get latest msg ID ", zap.String("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
return &kafkaID{messageID: high}, nil
return &KafkaID{MessageID: high}, nil
}

func (kc *Consumer) CheckTopicValid(topic string) error {
Expand Down
14 changes: 7 additions & 7 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) {
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)

msgID := &kafkaID{messageID: 1}
msgID := &KafkaID{MessageID: 1}
err = consumer.Seek(msgID, false)
assert.NoError(t, err)

msg := <-consumer.Chan()
assert.Equal(t, 333, BytesToInt(msg.Payload()))
assert.Equal(t, "333", msg.Properties()[common.TraceIDKey])
assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID)
assert.Equal(t, int64(2), msg.ID().(*KafkaID).MessageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 1)
}
Expand All @@ -66,14 +66,14 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) {
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)

msgID := &kafkaID{messageID: 1}
msgID := &KafkaID{MessageID: 1}
err = consumer.Seek(msgID, true)
assert.NoError(t, err)

msg := <-consumer.Chan()
assert.Equal(t, 222, BytesToInt(msg.Payload()))
assert.Equal(t, "222", msg.Properties()[common.TraceIDKey])
assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID)
assert.Equal(t, int64(1), msg.ID().(*KafkaID).MessageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 1)
}
Expand All @@ -88,7 +88,7 @@ func TestKafkaConsumer_GetSeek(t *testing.T) {
assert.NoError(t, err)
defer consumer.Close()

msgID := &kafkaID{messageID: 0}
msgID := &KafkaID{MessageID: 0}
err = consumer.Seek(msgID, false)
assert.NoError(t, err)

Expand Down Expand Up @@ -163,15 +163,15 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
defer consumer.Close()

latestMsgID, err := consumer.GetLatestMsgID()
assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID)
assert.Equal(t, int64(0), latestMsgID.(*KafkaID).MessageID)
assert.NoError(t, err)

data1 := []int{111, 222, 333}
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)

latestMsgID, err = consumer.GetLatestMsgID()
assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID)
assert.Equal(t, int64(2), latestMsgID.(*KafkaID).MessageID)
assert.NoError(t, err)
}

Expand Down
28 changes: 17 additions & 11 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@ import (
mqcommon "github.com/milvus-io/milvus/pkg/mq/common"
)

type kafkaID struct {
messageID int64
func NewKafkaID(messageID int64) mqcommon.MessageID {
return &KafkaID{
MessageID: messageID,
}
}

var _ mqcommon.MessageID = &kafkaID{}
type KafkaID struct {
MessageID int64
}

var _ mqcommon.MessageID = &KafkaID{}

func (kid *kafkaID) Serialize() []byte {
return SerializeKafkaID(kid.messageID)
func (kid *KafkaID) Serialize() []byte {
return SerializeKafkaID(kid.MessageID)
}

func (kid *kafkaID) AtEarliestPosition() bool {
return kid.messageID <= 0
func (kid *KafkaID) AtEarliestPosition() bool {
return kid.MessageID <= 0
}

func (kid *kafkaID) Equal(msgID []byte) (bool, error) {
return kid.messageID == DeserializeKafkaID(msgID), nil
func (kid *KafkaID) Equal(msgID []byte) (bool, error) {
return kid.MessageID == DeserializeKafkaID(msgID), nil
}

func (kid *kafkaID) LessOrEqualThan(msgID []byte) (bool, error) {
return kid.messageID <= DeserializeKafkaID(msgID), nil
func (kid *KafkaID) LessOrEqualThan(msgID []byte) (bool, error) {
return kid.MessageID <= DeserializeKafkaID(msgID), nil
}

func SerializeKafkaID(messageID int64) []byte {
Expand Down
18 changes: 9 additions & 9 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ import (
)

func TestKafkaID_Serialize(t *testing.T) {
rid := &kafkaID{messageID: 8}
rid := &KafkaID{MessageID: 8}
bin := rid.Serialize()
assert.NotNil(t, bin)
assert.NotZero(t, len(bin))
}

func TestKafkaID_AtEarliestPosition(t *testing.T) {
rid := &kafkaID{messageID: 8}
rid := &KafkaID{MessageID: 8}
assert.False(t, rid.AtEarliestPosition())

rid = &kafkaID{messageID: 0}
rid = &KafkaID{MessageID: 0}
assert.True(t, rid.AtEarliestPosition())
}

func TestKafkaID_LessOrEqualThan(t *testing.T) {
{
rid1 := &kafkaID{messageID: 8}
rid2 := &kafkaID{messageID: 0}
rid1 := &KafkaID{MessageID: 8}
rid2 := &KafkaID{MessageID: 0}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
assert.NoError(t, err)
assert.False(t, ret)
Expand All @@ -35,17 +35,17 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) {
}

{
rid1 := &kafkaID{messageID: 0}
rid2 := &kafkaID{messageID: 0}
rid1 := &KafkaID{MessageID: 0}
rid2 := &KafkaID{MessageID: 0}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
assert.NoError(t, err)
assert.True(t, ret)
}
}

func TestKafkaID_Equal(t *testing.T) {
rid1 := &kafkaID{messageID: 0}
rid2 := &kafkaID{messageID: 1}
rid1 := &KafkaID{MessageID: 0}
rid2 := &KafkaID{MessageID: 1}

{
ret, err := rid1.Equal(rid1.Serialize())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ func (km *kafkaMessage) Payload() []byte {
}

func (km *kafkaMessage) ID() common.MessageID {
kid := &kafkaID{messageID: int64(km.msg.TopicPartition.Offset)}
kid := &KafkaID{MessageID: int64(km.msg.TopicPartition.Offset)}
return kid
}
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestKafkaMessage_All(t *testing.T) {
km := &kafkaMessage{msg: msg}
properties := make(map[string]string)
assert.Equal(t, topic, km.Topic())
assert.Equal(t, int64(0), km.ID().(*kafkaID).messageID)
assert.Equal(t, int64(0), km.ID().(*KafkaID).MessageID)
assert.Nil(t, km.Payload())
assert.Equal(t, properties, km.Properties())
}
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc()

return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
return &KafkaID{MessageID: int64(m.TopicPartition.Offset)}, nil
}

func (kp *kafkaProducer) Close() {
Expand Down
13 changes: 13 additions & 0 deletions pkg/streaming/util/message/adaptor/message_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"fmt"

"github.com/apache/pulsar-client-go/pulsar"
rawKafka "github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/milvus-io/milvus/pkg/mq/common"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
mqkafka "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/kafka"
mqpulsar "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
msgkafka "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka"
msgpulsar "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
Expand All @@ -20,6 +23,8 @@ func MustGetMQWrapperIDFromMessage(messageID message.MessageID) common.MessageID
return mqpulsar.NewPulsarID(id.PulsarID())
} else if id, ok := messageID.(interface{ RmqID() int64 }); ok {
return &server.RmqID{MessageID: id.RmqID()}
} else if id, ok := messageID.(interface{ KafkaID() rawKafka.Offset }); ok {
return mqkafka.NewKafkaID(int64(id.KafkaID()))
}
panic("unsupported now")
}
Expand All @@ -31,6 +36,8 @@ func MustGetMessageIDFromMQWrapperID(commonMessageID common.MessageID) message.M
return msgpulsar.NewPulsarID(id.PulsarID())
} else if id, ok := commonMessageID.(*server.RmqID); ok {
return rmq.NewRmqID(id.MessageID)
} else if id, ok := commonMessageID.(*mqkafka.KafkaID); ok {
return msgkafka.NewKafkaID(rawKafka.Offset(id.MessageID))
}
return nil
}
Expand All @@ -48,6 +55,9 @@ func DeserializeToMQWrapperID(msgID []byte, walName string) (common.MessageID, e
case "rocksmq":
rID := server.DeserializeRmqID(msgID)
return &server.RmqID{MessageID: rID}, nil
case "kafka":
kID := mqkafka.DeserializeKafkaID(msgID)
return mqkafka.NewKafkaID(kID), nil
default:
return nil, fmt.Errorf("unsupported mq type %s", walName)
}
Expand All @@ -65,6 +75,9 @@ func MustGetMessageIDFromMQWrapperIDBytes(walName string, msgIDBytes []byte) mes
panic(err)
}
commonMsgID = mqpulsar.NewPulsarID(msgID)
case "kafka":
id := mqkafka.DeserializeKafkaID(msgIDBytes)
commonMsgID = mqkafka.NewKafkaID(id)
default:
panic("unsupported now")
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/streaming/util/message/adaptor/message_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
"github.com/stretchr/testify/assert"

msgkafka "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka"
msgpulsar "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
Expand All @@ -17,4 +18,7 @@ func TestIDConvension(t *testing.T) {
msgID := pulsar.EarliestMessageID()
id = MustGetMessageIDFromMQWrapperID(MustGetMQWrapperIDFromMessage(msgpulsar.NewPulsarID(msgID)))
assert.True(t, id.EQ(msgpulsar.NewPulsarID(msgID)))

kafkaID := MustGetMessageIDFromMQWrapperID(MustGetMQWrapperIDFromMessage(msgkafka.NewKafkaID(1)))
assert.True(t, kafkaID.EQ(msgkafka.NewKafkaID(1)))
}
Loading

0 comments on commit f3de27d

Please sign in to comment.