Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic send and batch send splits for milestone 1.5 #367

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ha/ha_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
for event := range channelClose {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
c.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", c.getInfo())
c.bootstrap = false
err, reconnected := retry(0, c)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
go func() {
for event := range channelClose {
if strings.EqualFold(event.Reason, stream.SocketClosed) || strings.EqualFold(event.Reason, stream.MetaDataUpdate) {
p.setStatus(StatusReconnecting)
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting..", p.getInfo())
err, reconnected := retry(0, p)
if err != nil {
logs.LogInfo(""+
logs.LogInfo(
"[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err)
}
if reconnected {
Expand Down
1 change: 0 additions & 1 deletion pkg/ha/reliable_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type IReliable interface {
// In both cases it retries the reconnection

func retry(backoff int, reliable IReliable) (error, bool) {
reliable.setStatus(StatusReconnecting)
sleepValue := rand.Intn(int((reliable.getTimeOut().Seconds()-2+1)+2)*1000) + backoff*1000
logs.LogInfo("[Reliable] - The %s for the stream %s is in reconnection in %d milliseconds", reliable.getInfo(), reliable.getStreamName(), sleepValue)
time.Sleep(time.Duration(sleepValue) * time.Millisecond)
Expand Down
40 changes: 38 additions & 2 deletions pkg/stream/blocking_queue.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,43 @@
package stream

import "time"
import (
"errors"
"sync/atomic"
"time"
)

var ErrBlockingQueueStopped = errors.New("blocking queue stopped")

type BlockingQueue[T any] struct {
queue chan T
capacity int
status int32
}

// NewBlockingQueue initializes a new BlockingQueue with the given capacity
func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] {
return &BlockingQueue[T]{
queue: make(chan T, capacity),
capacity: capacity,
status: 0,
}
}

// Enqueue adds an item to the queue, blocking if the queue is full
func (bq *BlockingQueue[T]) Enqueue(item T) {
func (bq *BlockingQueue[T]) Enqueue(item T) error {
if bq.IsStopped() {
return ErrBlockingQueueStopped
}
bq.queue <- item // This will block if the queue is full
return nil
}

// Dequeue removes an item from the queue with a timeout
func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T {
if bq.IsStopped() {
var zeroValue T // Zero value of type T
return zeroValue
}
select {
case item := <-bq.queue:
return item
Expand All @@ -38,3 +54,23 @@ func (bq *BlockingQueue[T]) Size() int {
func (bq *BlockingQueue[T]) IsEmpty() bool {
return len(bq.queue) == 0
}

// Stop stops the queue from accepting new items
// but allows the existing items to be processed
// Stop is different from Close in that it allows the
// existing items to be processed.
// That avoids the need to drain the queue before closing it.
func (bq *BlockingQueue[T]) Stop() {
atomic.StoreInt32(&bq.status, 1)
}

func (bq *BlockingQueue[T]) Close() {
if bq.IsStopped() {
atomic.StoreInt32(&bq.status, 2)
close(bq.queue)
}
}

func (bq *BlockingQueue[T]) IsStopped() bool {
return atomic.LoadInt32(&bq.status) == 1 || atomic.LoadInt32(&bq.status) == 2
}
2 changes: 1 addition & 1 deletion pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
res := c.internalDeclarePublisher(streamName, producer)
if res.Err == nil {
producer.startUnconfirmedMessagesTimeOutTask()
producer.processSendingMessages()
producer.processPendingSequencesQueue()
}
return producer, res.Err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func lookErrorCode(errorCode uint16) error {
return InternalError
case responseCodeAuthenticationFailureLoopback:
return AuthenticationFailureLoopbackError
case timeoutError:
return ConfirmationTimoutError
default:
{
logs.LogWarn("Error not handled %d", errorCode)
Expand Down
55 changes: 37 additions & 18 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,20 @@ func (coordinator *Coordinator) NewProducer(
return nil, err
}
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
timeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}),
status: open,
//dynamicSendCh: make(chan *messageSequence, dynSize),
pendingMessagesQueue: NewBlockingQueue[*messageSequence](dynSize),
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
confirmationTimeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}, 1),
status: open,
pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize),
}
coordinator.producers[lastId] = producer
return producer, err
}

func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) error {
consumer, err := coordinator.GetConsumerById(id)
consumer, err := coordinator.ExtractConsumerById(id)
if err != nil {
return err
}
Expand All @@ -92,27 +91,23 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event)
close(closeHandler)
}

return coordinator.removeById(id, coordinator.consumers)
return nil
}
func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error {

producer, err := coordinator.GetProducerById(id)
producer, err := coordinator.ExtractProducerById(id)
if err != nil {
return err
}
reason.StreamName = producer.GetStreamName()
reason.Name = producer.GetName()
tentatives := 0
for producer.lenUnConfirmed() > 0 && tentatives < 3 {
time.Sleep(200 * time.Millisecond)
tentatives++
}

if producer.closeHandler != nil {
producer.closeHandler <- reason
}

return coordinator.removeById(id, coordinator.producers)
producer.stopAndWaitPendingSequencesQueue()

return nil
}

func (coordinator *Coordinator) RemoveResponseById(id interface{}) error {
Expand Down Expand Up @@ -221,6 +216,18 @@ func (coordinator *Coordinator) GetConsumerById(id interface{}) (*Consumer, erro
return v.(*Consumer), err
}

func (coordinator *Coordinator) ExtractConsumerById(id interface{}) (*Consumer, error) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
if coordinator.consumers[id] == nil {
return nil, errors.New("item #{id} not found ")
}
consumer := coordinator.consumers[id].(*Consumer)
coordinator.consumers[id] = nil
delete(coordinator.consumers, id)
return consumer, nil
}

func (coordinator *Coordinator) GetResponseById(id uint32) (*Response, error) {
v, err := coordinator.getById(fmt.Sprintf("%d", id), coordinator.responses)
if err != nil {
Expand All @@ -241,6 +248,18 @@ func (coordinator *Coordinator) GetProducerById(id interface{}) (*Producer, erro
return v.(*Producer), err
}

func (coordinator *Coordinator) ExtractProducerById(id interface{}) (*Producer, error) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
if coordinator.producers[id] == nil {
return nil, errors.New("item #{id} not found ")
}
producer := coordinator.producers[id].(*Producer)
coordinator.producers[id] = nil
delete(coordinator.producers, id)
return producer, nil
}

// general functions

func (coordinator *Coordinator) getById(id interface{}, refmap map[interface{}]interface{}) (interface{}, error) {
Expand Down
Loading
Loading