Skip to content

Commit

Permalink
remove worker pool (#73)
Browse files Browse the repository at this point in the history
* remove worker pool

* add option WithUpdatesChannelCap

* update readme
  • Loading branch information
negasus authored Apr 2, 2024
1 parent 18a19b8 commit 158a3c1
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 37 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ b, err := bot.New("YOUR_BOT_TOKEN_FROM_BOTFATHER", opts...)
- `WithServerURL(serverURL string)` - set server url
- `WithSkipGetMe()` - skip call GetMe on bot init
- `WithAllowedUpdates(params AllowedUpdates)` - set [allowed_updates](https://core.telegram.org/bots/api#getupdates) for getUpdates method
- `WithUpdatesChannelCap(cap int)` - set updates channel capacity, by default 1024

## Message.Text and CallbackQuery.Data handlers

Expand Down
5 changes: 3 additions & 2 deletions get_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) {
for _, upd := range updates {
atomic.StoreInt64(&b.lastUpdateID, upd.ID)
select {
case <-ctx.Done():
b.error("some updates lost, ctx done")
return
case b.updates <- upd:
default:
b.error("error send update to processing, channel is full")
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bot

import (
"time"

"github.com/go-telegram/bot/models"
)

// Option is a function that configures a bot.
Expand Down Expand Up @@ -93,3 +95,10 @@ func WithAllowedUpdates(params AllowedUpdates) Option {
b.allowedUpdates = params
}
}

// WithUpdatesChannelCap allows setting custom capacity for the Updates channel
func WithUpdatesChannelCap(cap int) Option {
return func(b *Bot) {
b.updates = make(chan *models.Update, cap)
}
}
35 changes: 2 additions & 33 deletions wait_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,17 @@ package bot
import (
"context"
"sync"
"time"

"github.com/go-telegram/bot/models"
)

// waitUpdates listen Updates channel and spawn goroutines if needed. It's a simple worker pool
// waitUpdates listen Updates channel and run ProcessUpdate
func (b *Bot) waitUpdates(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

taskQueue := make(chan *models.Update)

for {
select {
case <-ctx.Done():
return
case upd := <-b.updates:
select {
case taskQueue <- upd:
default:
wg.Add(1)
go func(ctx context.Context, wg *sync.WaitGroup, taskQueue chan *models.Update) {
defer wg.Done()

b.ProcessUpdate(ctx, upd)

const cleanupDuration = 10 * time.Second
cleanupTicker := time.NewTicker(cleanupDuration)
defer cleanupTicker.Stop()

for {
select {
case <-ctx.Done():
return
case upd := <-taskQueue:
b.ProcessUpdate(ctx, upd)
cleanupTicker.Reset(cleanupDuration)
case <-cleanupTicker.C:
return
}
}
}(ctx, wg, taskQueue)
}
b.ProcessUpdate(ctx, upd)
}
}
}
5 changes: 3 additions & 2 deletions webhook_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func (b *Bot) WebhookHandler() http.HandlerFunc {
}

select {
case <-req.Context().Done():
b.error("some updates lost, ctx done")
return
case b.updates <- update:
default:
b.error("error send update to processing, channel is full")
}
}
}

0 comments on commit 158a3c1

Please sign in to comment.