diff --git a/README.md b/README.md index 50d67d0..3274183 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,7 @@ b, err := bot.New("YOUR_BOT_TOKEN_FROM_BOTFATHER", opts...) - `WithServerURL(serverURL string)` - set server url - `WithSkipGetMe()` - skip call GetMe on bot init - `WithAllowedUpdates(params AllowedUpdates)` - set [allowed_updates](https://core.telegram.org/bots/api#getupdates) for getUpdates method +- `WithUpdatesChannelCap(cap int)` - set updates channel capacity, by default 1024 ## Message.Text and CallbackQuery.Data handlers diff --git a/get_updates.go b/get_updates.go index 005cd6e..7687f48 100644 --- a/get_updates.go +++ b/get_updates.go @@ -73,9 +73,10 @@ func (b *Bot) getUpdates(ctx context.Context, wg *sync.WaitGroup) { for _, upd := range updates { atomic.StoreInt64(&b.lastUpdateID, upd.ID) select { + case <-ctx.Done(): + b.error("some updates lost, ctx done") + return case b.updates <- upd: - default: - b.error("error send update to processing, channel is full") } } } diff --git a/options.go b/options.go index 73ce550..da3a0fd 100644 --- a/options.go +++ b/options.go @@ -2,6 +2,8 @@ package bot import ( "time" + + "github.com/go-telegram/bot/models" ) // Option is a function that configures a bot. @@ -93,3 +95,10 @@ func WithAllowedUpdates(params AllowedUpdates) Option { b.allowedUpdates = params } } + +// WithUpdatesChannelCap allows setting custom capacity for the Updates channel +func WithUpdatesChannelCap(cap int) Option { + return func(b *Bot) { + b.updates = make(chan *models.Update, cap) + } +} diff --git a/wait_updates.go b/wait_updates.go index 0ab4619..1ff92be 100644 --- a/wait_updates.go +++ b/wait_updates.go @@ -3,48 +3,17 @@ package bot import ( "context" "sync" - "time" - - "github.com/go-telegram/bot/models" ) -// waitUpdates listen Updates channel and spawn goroutines if needed. It's a simple worker pool +// waitUpdates listen Updates channel and run ProcessUpdate func (b *Bot) waitUpdates(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - taskQueue := make(chan *models.Update) - for { select { case <-ctx.Done(): - return case upd := <-b.updates: - select { - case taskQueue <- upd: - default: - wg.Add(1) - go func(ctx context.Context, wg *sync.WaitGroup, taskQueue chan *models.Update) { - defer wg.Done() - - b.ProcessUpdate(ctx, upd) - - const cleanupDuration = 10 * time.Second - cleanupTicker := time.NewTicker(cleanupDuration) - defer cleanupTicker.Stop() - - for { - select { - case <-ctx.Done(): - return - case upd := <-taskQueue: - b.ProcessUpdate(ctx, upd) - cleanupTicker.Reset(cleanupDuration) - case <-cleanupTicker.C: - return - } - } - }(ctx, wg, taskQueue) - } + b.ProcessUpdate(ctx, upd) } } } diff --git a/webhook_handler.go b/webhook_handler.go index aee864c..1734a3f 100644 --- a/webhook_handler.go +++ b/webhook_handler.go @@ -29,9 +29,10 @@ func (b *Bot) WebhookHandler() http.HandlerFunc { } select { + case <-req.Context().Done(): + b.error("some updates lost, ctx done") + return case b.updates <- update: - default: - b.error("error send update to processing, channel is full") } } }