English | 中文
WorkQueue
is a high-performance, thread-safe, and memory-efficient Go library for managing work queues. It offers a variety of queue implementations such as Queue
, DelayingQueue
, PriorityQueue
, and RateLimitingQueue
, each tailored for specific use cases and performance needs. The library's design is simple, user-friendly, and platform-independent, making it suitable for a broad spectrum of applications and environments.
After several iterations and real-world usage, we've gathered valuable user feedback and insights. This led to a complete redesign and optimization of WorkQueue's architecture and underlying code in the new version (v2), significantly enhancing its robustness, reliability, and security.
The WorkQueue(v2) has undergone a comprehensive architectural revamp, greatly improving its robustness and reliability. This redesign enables the library to manage demanding workloads with increased stability, making it ideal for both simple task queues and complex workflows. By utilizing advanced algorithms and optimized data structures, WorkQueue(v2) provides superior performance, efficiently managing larger task volumes, reducing latency, and increasing throughput.
WorkQueue(v2) offers a diverse set of queue implementations to cater to different needs, including standard task management, delayed execution, task prioritization, and rate-limited processing. This flexibility allows you to select the most suitable tool for your specific use case, ensuring optimal performance and functionality. With its cross-platform design, WorkQueue(v2) guarantees consistent behavior and performance across various operating systems, making it a versatile solution for different environments.
The development of WorkQueue(v2) has been heavily influenced by user feedback and real-world usage, resulting in a library that better meets the needs of its users. By addressing user-reported issues and incorporating feature requests, WorkQueue(v2) offers a more refined and user-centric experience.
Choosing WorkQueue(v2) for your application or project could be a great decision. :)
-
User-Friendly: The intuitive design ensures easy usage, allowing users of varying skill levels to quickly become proficient.
-
No External Dependencies: The system operates independently, without the need for additional software or libraries, reducing compatibility issues and simplifying deployment.
-
High Performance: The system is optimized for speed and efficiency, swiftly handling tasks to enhance productivity and scalability.
-
Minimal Memory Usage: The design utilizes minimal system resources, ensuring smooth operation even on devices with limited hardware capabilities, and freeing up memory for other applications.
-
Thread-Safe: The system supports multi-threading, allowing for concurrent operations without the risk of data corruption or interference, providing a stable environment for multiple users or processes.
-
Supports Action Callback Functions: The system can execute predefined functions in response to specific events, enhancing interactivity, customization, and responsiveness.
-
Cross-Platform Compatibility: The system operates seamlessly across different operating systems and devices, providing flexibility for diverse user environments.
Following the redesign, the architecture UML diagram for WorkQueue(v2) is shown below:
The following benchmark results demonstrate the performance of the WorkQueue
library.
When a linked list undergoes data modifications, the primary changes occur in the pointers of the elements, rather than directly adding elements like dynamic arrays. Over extended periods, linked lists prove to be more memory efficient than dynamic arrays.
Direct performance
$ go test -benchmem -run=^$ -bench ^BenchmarkList* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/list
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkList_PushBack-12 186905107 6.447 ns/op 0 B/op 0 allocs/op
BenchmarkList_PushFront-12 157372052 7.701 ns/op 0 B/op 0 allocs/op
BenchmarkList_PopBack-12 179555846 6.645 ns/op 0 B/op 0 allocs/op
BenchmarkList_PopFront-12 180030582 6.989 ns/op 0 B/op 0 allocs/op
BenchmarkList_InsertBefore-12 189274771 6.406 ns/op 0 B/op 0 allocs/op
BenchmarkList_InsertAfter-12 160078981 6.490 ns/op 0 B/op 0 allocs/op
BenchmarkList_Remove-12 183250782 6.440 ns/op 0 B/op 0 allocs/op
BenchmarkList_MoveToFront-12 146021263 7.837 ns/op 0 B/op 0 allocs/op
BenchmarkList_MoveToBack-12 141336429 8.589 ns/op 0 B/op 0 allocs/op
BenchmarkList_Swap-12 100000000 10.47 ns/op 0 B/op 0 allocs/op
Compare with the standard library
Both the standard library and this project employ the same algorithm, leading to comparable performance. However, the list
in this project provides additional features compared to the standard library. Furthermore, the list
node uses sync.Pool
to minimize memory allocation. Therefore, under high concurrency, the performance of the project's list
may surpass that of the standard library.
$ go test -benchmem -run=^$ -bench ^BenchmarkCompare* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/list
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkCompareGoStdList_PushBack-12 8256513 129.4 ns/op 56 B/op 1 allocs/op
BenchmarkCompareGoStdList_PushFront-12 9448060 115.5 ns/op 55 B/op 1 allocs/op
BenchmarkCompareGoStdList_PopBack-12 178923963 23.60 ns/op 0 B/op 0 allocs/op
BenchmarkCompareGoStdList_PopFront-12 33846044 46.40 ns/op 0 B/op 0 allocs/op
BenchmarkCompareGoStdList_InsertBefore-12 12046944 93.53 ns/op 55 B/op 1 allocs/op
BenchmarkCompareGoStdList_InsertAfter-12 11364718 94.52 ns/op 55 B/op 1 allocs/op
BenchmarkCompareWQList_PushBack-12 11582172 109.7 ns/op 55 B/op 1 allocs/op
BenchmarkCompareWQList_PushFront-12 10893723 92.67 ns/op 55 B/op 1 allocs/op
BenchmarkCompareWQList_PopBack-12 181593789 6.841 ns/op 0 B/op 0 allocs/op
BenchmarkCompareWQList_PopFront-12 179179370 7.057 ns/op 0 B/op 0 allocs/op
BenchmarkCompareWQList_InsertBefore-12 9302694 116.5 ns/op 55 B/op 1 allocs/op
BenchmarkCompareWQList_InsertAfter-12 10237197 117.7 ns/op 55 B/op 1 allocs/o
Prior to version v2.1.3, this project utilized the Insertion Sort
algorithm for sorting elements within the heap. However, starting from version v2.1.3, the project has transitioned to using the Red Black Tree
algorithm for this purpose. The Red-Black Tree
algorithm, with its time complexity of O(logn)
, typically outperforms the Insertion Sort
algorithm in most scenarios.
Direct performance
The project uses the Insertion Sort
algorithm to sort elements in the heap. In a sorted array, the time complexity of the Insertion Sort
algorithm is O(n)
. In this project, a list
is used to store the elements in the heap. Each element is appended to the end of the list and then sorted.
$ go test -benchmem -run=^$ -bench ^BenchmarkHeap* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkHeap_Push-12 115560 123634 ns/op 0 B/op 0 allocs/op
BenchmarkHeap_Pop-12 176871700 10.66 ns/op 0 B/op 0 allocs/op
BenchmarkHeap_Remove-12 1000000000 1.217 ns/op 0 B/op 0 allocs/op
Compare with the standard library
The heap in this project uses the Insertion Sort
algorithm for sorting elements, while the standard library uses the container/heap
package to implement the heap. The time complexity of the standard library's sorting is O(logn)
, while the project's sorting has a time complexity of O(n)
. Therefore, the project's sorting is slower than the standard library's. However, this is due to the difference in the algorithms used, and thus, a direct comparison may not be fair.
Tip
The Insertion Sort
algorithm can provide a stable and consistent sorting, unlike the binary heap. If you have any better suggestions, please feel free to share.
$ go test -benchmem -run=^$ -bench ^BenchmarkCompare* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkCompareGoStdHeap_Push-12 4552110 278.9 ns/op 92 B/op 1 allocs/op
BenchmarkCompareGoStdHeap_Pop-12 3726718 362.9 ns/op 0 B/op 0 allocs/op
BenchmarkCompareWQHeap_Push-12 109158 121247 ns/op 48 B/op 1 allocs/op
BenchmarkCompareWQHeap_Pop-12 174782917 15.10 ns/op 0 B/op 0 allocs/op
Direct performance
The project uses the Red-Black Tree
algorithm to sort elements in the heap. The time complexity of the Red-Black Tree
algorithm is O(logn)
. In this project, a tree
is used to store the elements in the heap. Each element is added to the tree
and then sorted.
$ go test -benchmem -run=^$ -bench ^BenchmarkHeap* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkHeap_Push-12 5630415 257.5 ns/op 0 B/op 0 allocs/op
BenchmarkHeap_Pop-12 16859534 117.4 ns/op 0 B/op 0 allocs/op
BenchmarkHeap_Remove-12 148432172 8.197 ns/op 0 B/op 0 allocs/op
Compare with the standard library
The heap in this project uses the Red-Black Tree
algorithm for sorting elements, while the standard library uses the container/heap
package to implement the heap. The time complexity of the standard library's sorting is O(logn)
, while the project's sorting has a time complexity of O(logn)
. Therefore, the project's sorting is same as the standard library's. But the project's sorting is more stable and consistent than the standard library's.
$ go test -benchmem -run=^$ -bench ^BenchmarkCompare* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2/internal/container/heap
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkCompareGoStdHeap_Push-12 4368770 283.3 ns/op 110 B/op 1 allocs/op
BenchmarkCompareGoStdHeap_Pop-12 3745934 357.6 ns/op 0 B/op 0 allocs/op
BenchmarkCompareWQHeap_Push-12 4252489 350.2 ns/op 64 B/op 1 allocs/op
BenchmarkCompareWQHeap_Pop-12 15759519 116.7 ns/op 0 B/op 0 allocs/op
In essence, memory alignment enhances performance, minimizes CPU cycles, reduces power usage, boosts stability, and ensures predictable behavior. This is why it's considered a best practice to align data in memory, especially on contemporary 64-bit CPUs.
Node struct alignment:
---- Fields in struct ----
+----+----------------+-----------+-----------+
| ID | FIELDTYPE | FIELDNAME | FIELDSIZE |
+----+----------------+-----------+-----------+
| A | unsafe.Pointer | parentRef | 8 |
| B | int64 | Priority | 8 |
| C | int64 | Color | 8 |
| D | *list.Node | Left | 8 |
| E | *list.Node | Right | 8 |
| F | *list.Node | Parent | 8 |
| G | interface {} | Value | 16 |
+----+----------------+-----------+-----------+
---- Memory layout ----
|A|A|A|A|A|A|A|A|
|B|B|B|B|B|B|B|B|
|C|C|C|C|C|C|C|C|
|D|D|D|D|D|D|D|D|
|E|E|E|E|E|E|E|E|
|F|F|F|F|F|F|F|F|
|G|G|G|G|G|G|G|G|
|G|G|G|G|G|G|G|G|
total cost: 64 Bytes.
Here are the benchmark results for all queues in the WorkQueue
library.
Note
The RateLimitingQueue is quite slow due to its use of bucket-based rate limiting. It's not recommended for high-performance scenarios.
$ go test -benchmem -run=^$ -bench ^Benchmark* .
goos: darwin
goarch: amd64
pkg: github.com/shengyanli1982/workqueue/v2
cpu: Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz
BenchmarkDelayingQueue_Put-12 4635976 255.6 ns/op 72 B/op 1 allocs/op
BenchmarkDelayingQueue_PutWithDelay-12 1635588 784.7 ns/op 71 B/op 1 allocs/op
BenchmarkDelayingQueue_Get-12 24795136 47.53 ns/op 21 B/op 0 allocs/op
BenchmarkDelayingQueue_PutAndGet-12 15995890 75.25 ns/op 7 B/op 0 allocs/op
BenchmarkDelayingQueue_PutWithDelayAndGet-12 1731825 664.3 ns/op 29 B/op 1 allocs/op
BenchmarkPriorityQueue_Put-12 3030818 433.5 ns/op 71 B/op 1 allocs/op
BenchmarkPriorityQueue_PutWithPriority-12 2937105 452.0 ns/op 71 B/op 1 allocs/op
BenchmarkPriorityQueue_Get-12 11245106 134.3 ns/op 23 B/op 0 allocs/op
BenchmarkPriorityQueue_PutAndGet-12 12962031 92.24 ns/op 7 B/op 0 allocs/op
BenchmarkPriorityQueue_PutWithPriorityAndGet-12 14543769 83.70 ns/op 7 B/op 0 allocs/op
BenchmarkQueue_Put-12 6102608 206.1 ns/op 71 B/op 1 allocs/op
BenchmarkQueue_Get-12 30304675 45.30 ns/op 17 B/op 0 allocs/op
BenchmarkQueue_PutAndGet-12 17171174 71.83 ns/op 7 B/op 0 allocs/op
BenchmarkQueue_Idempotent_Put-12 1573570 706.9 ns/op 136 B/op 3 allocs/op
BenchmarkQueue_Idempotent_Get-12 2275533 534.4 ns/op 105 B/op 0 allocs/op
BenchmarkQueue_Idempotent_PutAndGet-12 2551188 494.5 ns/op 75 B/op 1 allocs/op
BenchmarkRateLimitingQueue_Put-12 5852602 214.0 ns/op 71 B/op 1 allocs/op
BenchmarkRateLimitingQueue_PutWithLimited-12 1412991 852.6 ns/op 135 B/op 2 allocs/op
BenchmarkRateLimitingQueue_Get-12 28186063 49.60 ns/op 19 B/op 0 allocs/op
BenchmarkRateLimitingQueue_PutAndGet-12 15600679 75.69 ns/op 7 B/op 0 allocs/op
BenchmarkRateLimitingQueue_PutWithLimitedAndGet-12 1395084 855.5 ns/op 135 B/op 2 allocs/op
go get github.com/shengyanli1982/workqueue/v2
For more examples on how to use WorkQueue, please refer to the examples
directory.
The Queue
is a simple FIFO (First In, First Out) queue that serves as the base for all other queues in this project. It maintains a dirty
set and a processing
set to keep track of the queue's state.
The dirty
set contains items that have been added to the queue but have not yet been processed. The processing
set contains items that are currently being processed.
Important
If you create a new queue with the WithValueIdempotent
configuration, the queue will automatically remove duplicate items. This means that if you put the same item into the queue, the queue will only keep one instance of that item.
However, the parameter for PutXXX
functions should refer to an object that can be hashed by the map
in the Go
standard library. If the object cannot be hashed, such as pointers or slices, the program may throw an error. To solve this problem, you can use WithSetCreator
to create a custom set which can handle these objects.
The Queue
has several configuration options that can be set when creating a queue.
WithCallback
: Sets callback functions.WithValueIdempotent
: Enables item idempotency for the queue.WithSetCreator
: Sets the creator function for the queue's internal set.
Shutdown
: Terminates the queue, preventing it from accepting new tasks.IsClosed
: Checks if the queue is closed, returns a boolean.Len
: Returns the number of elements in the queue.Values
: Returns all elements in the queue as a slice.Range
: Iterates over all elements in the queue.Put
: Adds an element to the queue.Get
: Retrieves an element from the queue.Done
: Notifies the queue that an element has been processed.
Note
The Done
function is only used when the queue is created with the WithValueIdempotent
option. If you don't use this option, you don't need to call this function.
OnPut
: Invoked when an item is added to the queue.OnGet
: Invoked when an item is retrieved from the queue.OnDone
: Invoked when an item has been processed.
package main
import (
"errors"
"fmt"
"sync"
"time"
wkq "github.com/shengyanli1982/workqueue/v2"
)
// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
defer wg.Done()
// 无限循环,直到函数返回
// Infinite loop until the function returns
for {
// 从队列中获取一个元素
// Get an element from the queue
element, err := queue.Get()
// 如果获取元素时发生错误,则处理错误
// If an error occurs when getting the element, handle the error
if err != nil {
// 如果错误不是因为队列为空,则打印错误并返回
// If the error is not because the queue is empty, print the error and return
if !errors.Is(err, wkq.ErrQueueIsEmpty) {
fmt.Println(err)
return
} else {
// 如果错误是因为队列为空,则继续循环
// If the error is because the queue is empty, continue the loop
continue
}
}
// 打印获取到的元素
// Print the obtained element
fmt.Println("> get element:", element)
// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
// Mark the element as done, 'Done' is required after 'Get'
queue.Done(element)
}
}
func main() {
// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
// Create a WaitGroup to wait for all goroutines to complete
wg := sync.WaitGroup{}
// 创建一个新的队列
// Create a new queue
queue := wkq.NewQueue(nil)
// 增加 WaitGroup 的计数器
// Increase the counter of the WaitGroup
wg.Add(1)
// 启动一个新的 goroutine 来运行 comsumer 函数
// Start a new goroutine to run the comsumer function
go consumer(queue, &wg)
// 将 "hello" 放入队列
// Put "hello" into the queue
_ = queue.Put("hello")
// 将 "world" 放入队列
// Put "world" into the queue
_ = queue.Put("world")
// 等待一秒钟,让 comsumer 有机会处理队列中的元素
// Wait for a second to give the comsumer a chance to process the elements in the queue
time.Sleep(time.Second)
// 关闭队列
// Shut down the queue
queue.Shutdown()
// 等待所有的 goroutine 完成
// Wait for all goroutines to complete
wg.Wait()
}
Result
$ go run demo.go
> get element: hello
> get element: world
queue is shutting down
The Delaying Queue
is a queue that supports delayed execution. It builds upon the Queue
and uses a Heap
to manage the expiration times of the elements. When you add an element to the queue, you can specify a delay time. The elements are then sorted by this delay time and executed after the specified delay has passed.
Tip
When the Delaying Queue
is empty in the Heap
or the first element is not due, it will wait every heartbeat
time for an element in the Heap
that can be processed. This means that there may be a slight deviation in the actual delay time of the element. The actual mini delay time is the "element delay time + 300ms".
If precise timing is important for your project, you may consider using the kairos
project I wrote.
The Delaying Queue
inherits the configuration of the Queue
.
WithCallback
: Sets callback functions.
The Delaying Queue
inherits the methods of the Queue
. Additionally, it introduces the following method:
PutWithDelay
: Adds an element to the queue with a specified delay.HeapRange
: Iterates over all elements in the heap.
The Delaying Queue
inherits the callbacks of the Queue
. Additionally, it introduces the following callbacks:
OnDelay
: Invoked when an element is added to the queue with a specified delay.OnPullError
: Invoked when an error occurs while pulling an element from the heap to the queue.
package main
import (
"errors"
"fmt"
"sync"
"time"
wkq "github.com/shengyanli1982/workqueue/v2"
)
// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
defer wg.Done()
// 无限循环,直到函数返回
// Infinite loop until the function returns
for {
// 从队列中获取一个元素
// Get an element from the queue
element, err := queue.Get()
// 如果获取元素时发生错误,则处理错误
// If an error occurs when getting the element, handle the error
if err != nil {
// 如果错误不是因为队列为空,则打印错误并返回
// If the error is not because the queue is empty, print the error and return
if !errors.Is(err, wkq.ErrQueueIsEmpty) {
fmt.Println(err)
return
} else {
// 如果错误是因为队列为空,则继续循环
// If the error is because the queue is empty, continue the loop
continue
}
}
// 打印获取到的元素
// Print the obtained element
fmt.Println("> get element:", element)
// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
// Mark the element as done, 'Done' is required after 'Get'
queue.Done(element)
}
}
func main() {
// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
// Create a WaitGroup to wait for all goroutines to complete
wg := sync.WaitGroup{}
// 创建一个新的队列
// Create a new queue
queue := wkq.NewDelayingQueue(nil)
// 增加 WaitGroup 的计数器
// Increase the counter of the WaitGroup
wg.Add(1)
// 启动一个新的 goroutine 来运行 consumer 函数
// Start a new goroutine to run the consumer function
go consumer(queue, &wg)
// 将 "delay 1" 放入队列,并设置其延迟时间为 200 毫秒
// Put "delay 1" into the queue and set its delay time to 200 milliseconds
_ = queue.PutWithDelay("delay 1", 200)
// 将 "delay 2" 放入队列,并设置其延迟时间为 100 毫秒
// Put "delay 2" into the queue and set its delay time to 100 milliseconds
_ = queue.PutWithDelay("delay 2", 100)
// 将 "hello" 放入队列
// Put "hello" into the queue
_ = queue.Put("hello")
// 将 "world" 放入队列
// Put "world" into the queue
_ = queue.Put("world")
// 等待一秒钟,让 comsumer 有机会处理队列中的元素
// Wait for a second to give the comsumer a chance to process the elements in the queue
time.Sleep(time.Second)
// 关闭队列
// Shut down the queue
queue.Shutdown()
// 等待所有的 goroutine 完成
// Wait for all goroutines to complete
wg.Wait()
}
Result
$ go run demo.go
> get element: hello
> get element: world
> get element: delay 2
> get element: delay 1
queue is shutting down
The Priority Queue
is a queue that facilitates prioritized execution. It is constructed on the foundation of the Queue
and employs a Heap
to manage the priorities of the elements. In the Priority Queue
, elements are sorted according to their priorities. Both Queue
and Heap
utilize the same element structure and storage.
When you add an element to the queue, you can designate its priority. The elements are subsequently sorted and executed based on these priorities. However, if an element has a very low priority and another has a very high priority, the low priority element may never be executed. Exercise Caution !!!
The Priority Queue
inherits the configuration of the Queue
.
WithCallback
: Sets callback functions.
The Priority Queue
inherits the methods of the Queue
. Additionally, it provides the following methods:
PutWithPriority
: Adds an element to the queue with a specified priority.Put
: Adds an element to the queue with a default priority (value is0
).
The Priority Queue
inherits the callbacks of the Queue
. Additionally, it provides the following callback:
OnPriority
: Invoked when an element is added to the queue with a specified priority.
Tip
Note that in the Priority Queue
, when an element is added, the OnPut
callback is not triggered. Instead, the OnPriority
callback is exclusively invoked.
package main
import (
"errors"
"fmt"
"sync"
"time"
wkq "github.com/shengyanli1982/workqueue/v2"
)
// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
defer wg.Done()
// 无限循环,直到函数返回
// Infinite loop until the function returns
for {
// 从队列中获取一个元素
// Get an element from the queue
element, err := queue.Get()
// 如果获取元素时发生错误,则处理错误
// If an error occurs when getting the element, handle the error
if err != nil {
// 如果错误不是因为队列为空,则打印错误并返回
// If the error is not because the queue is empty, print the error and return
if !errors.Is(err, wkq.ErrQueueIsEmpty) {
fmt.Println(err)
return
} else {
// 如果错误是因为队列为空,则继续循环
// If the error is because the queue is empty, continue the loop
continue
}
}
// 打印获取到的元素
// Print the obtained element
fmt.Println("> get element:", element)
// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
// Mark the element as done, 'Done' is required after 'Get'
queue.Done(element)
}
}
func main() {
// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
// Create a WaitGroup to wait for all goroutines to complete
wg := sync.WaitGroup{}
// 创建一个新的队列
// Create a new queue
queue := wkq.NewPriorityQueue(nil)
// 增加 WaitGroup 的计数器
// Increase the counter of the WaitGroup
wg.Add(1)
// 启动一个新的 goroutine 来运行 consumer 函数
// Start a new goroutine to run the consumer function
go consumer(queue, &wg)
// 将 "delay 1" 放入队列,并设置其优先级为 200
// Put "delay 1" into the queue and set its priority to 200
_ = queue.PutWithPriority("priority 1", 200)
// 将 "delay 2" 放入队列,并设置其优先级为 100
// Put "delay 2" into the queue and set its priority to 100
_ = queue.PutWithPriority("priority 2", 100)
// 将 "hello" 放入队列
// Put "hello" into the queue
_ = queue.Put("hello")
// 将 "world" 放入队列
// Put "world" into the queue
_ = queue.Put("world")
// 等待一秒钟,让 comsumer 有机会处理队列中的元素
// Wait for a second to give the comsumer a chance to process the elements in the queue
time.Sleep(time.Second)
// 关闭队列
// Shut down the queue
queue.Shutdown()
// 等待所有的 goroutine 完成
// Wait for all goroutines to complete
wg.Wait()
}
Result
$ go run demo.go
> get element: hello
> get element: world
> get element: priority 2
> get element: priority 1
queue is shutting down
The RateLimiting Queue
is a queue that supports rate-limited execution. It is built on top of the Delaying Queue
. When adding an element to the queue, you can specify the rate limit, and the element will be processed according to this rate limit.
Tip
The default rate limit is based on the Nop
strategy. You can define your own rate limit algorithm by implementing the Limiter
interface. The project provides a token bucket
algorithm as a Limiter implementation.
The RateLimiting Queue
inherits the configuration of the Delaying Queue
.
WithCallback
: Sets callback functions.WithLimiter
: Sets the rate limiter for the queue.
The RateLimiting Queue
inherits the methods of the Delaying Queue
. Additionally, it has the following method:
PutWithLimited
: Adds an element to the queue. The delay time of the element is determined by the limiter.
The RateLimiting Queue
inherits the callback of the Delaying Queue
. Additionally, it has the following method:
OnLimited
: Invoked when an element is added to the queue byPutWithLimited
.
package main
import (
"errors"
"fmt"
"sync"
"time"
wkq "github.com/shengyanli1982/workqueue/v2"
)
// consumer 函数是一个消费者函数,它从队列中获取元素并处理它们
// The consumer function is a consumer function that gets elements from the queue and processes them
func consumer(queue wkq.Queue, wg *sync.WaitGroup) {
// 当函数返回时,调用 wg.Done() 来通知 WaitGroup 一个任务已经完成
// When the function returns, call wg.Done() to notify the WaitGroup that a task has been completed
defer wg.Done()
// 无限循环,直到函数返回
// Infinite loop until the function returns
for {
// 从队列中获取一个元素
// Get an element from the queue
element, err := queue.Get()
// 如果获取元素时发生错误,则处理错误
// If an error occurs when getting the element, handle the error
if err != nil {
// 如果错误不是因为队列为空,则打印错误并返回
// If the error is not because the queue is empty, print the error and return
if !errors.Is(err, wkq.ErrQueueIsEmpty) {
fmt.Println(err)
return
} else {
// 如果错误是因为队列为空,则继续循环
// If the error is because the queue is empty, continue the loop
continue
}
}
// 打印获取到的元素
// Print the obtained element
fmt.Println("> get element:", element)
// 标记元素为已处理,'Done' 是在 'Get' 之后必需的
// Mark the element as done, 'Done' is required after 'Get'
queue.Done(element)
}
}
func main() {
// 创建一个 WaitGroup,用于等待所有的 goroutine 完成
// Create a WaitGroup to wait for all goroutines to complete
wg := sync.WaitGroup{}
// 创建一个新的桶形限流器,参数为桶的容量和填充速度
// Create a new bucket rate limiter, the parameters are the capacity of the bucket and the fill rate
limiter := wkq.NewBucketRateLimiterImpl(5, 1)
// 创建一个新的限流队列配置,并设置其限流器
// Create a new rate limiting queue configuration and set its limiter
config := wkq.NewRateLimitingQueueConfig().WithLimiter(limiter)
// 使用配置创建一个新的限流队列
// Create a new rate limiting queue with the configuration
queue := wkq.NewRateLimitingQueue(config)
// 增加 WaitGroup 的计数器
// Increase the counter of the WaitGroup
wg.Add(1)
// 启动一个新的 goroutine 来运行 consumer 函数
// Start a new goroutine to run the consumer function
go consumer(queue, &wg)
// 将 "delay 1" 放入队列,并设置其延迟时间为 200 毫秒
// Put "delay 1" into the queue and set its delay time to 200 milliseconds
_ = queue.PutWithDelay("delay 1", 200)
// 将 "delay 2" 放入队列,并设置其延迟时间为 100 毫秒
// Put "delay 2" into the queue and set its delay time to 100 milliseconds
_ = queue.PutWithDelay("delay 2", 100)
// 将 "hello" 放入队列
// Put "hello" into the queue
_ = queue.Put("hello")
// 将 "world" 放入队列
// Put "world" into the queue
_ = queue.Put("world")
// 将 "limited" 放入队列, 触发限流
// Put "limited" into the queue, trigger rate limiting
for i := 0; i < 10; i++ {
go func(i int) {
_ = queue.PutWithLimited(fmt.Sprintf("limited %d", i))
}(i)
}
// 等待一秒钟,让 comsumer 有机会处理队列中的元素
// Wait for a second to give the comsumer a chance to process the elements in the queue
time.Sleep(time.Second)
// 关闭队列
// Shut down the queue
queue.Shutdown()
// 等待所有的 goroutine 完成
// Wait for all goroutines to complete
wg.Wait()
}
Result
$ go run demo.go
> get element: hello
> get element: world
> get element: delay 2
> get element: delay 1
> get element: limited 9
> get element: limited 6
> get element: limited 7
> get element: limited 8
> get element: limited 3
> get element: limited 2
> get element: limited 0
> get element: limited 5
> get element: limited 1
> get element: limited 4
queue is shutting down