diff --git a/concurrent_map.go b/concurrent_map.go index baccab0..d0778ad 100644 --- a/concurrent_map.go +++ b/concurrent_map.go @@ -16,8 +16,9 @@ type Stringer interface { // A "thread" safe map of type string:Anything. // To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. type ConcurrentMap[K comparable, V any] struct { - shards []*ConcurrentMapShared[K, V] - sharding func(key K) uint32 + shards []*ConcurrentMapShared[K, V] + sharding func(key K) uint32 + shardCount uint32 } // A "thread" safe string to anything map. @@ -26,12 +27,13 @@ type ConcurrentMapShared[K comparable, V any] struct { sync.RWMutex // Read Write mutex, guards access to internal map. } -func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { +func create[K comparable, V any](sharding func(key K) uint32, shardCount uint32) ConcurrentMap[K, V] { m := ConcurrentMap[K, V]{ - sharding: sharding, - shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT), + sharding: sharding, + shards: make([]*ConcurrentMapShared[K, V], shardCount), + shardCount: shardCount, } - for i := 0; i < SHARD_COUNT; i++ { + for i := 0; i < int(m.shardCount); i++ { m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)} } return m @@ -39,22 +41,27 @@ func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V // Creates a new concurrent map. func New[V any]() ConcurrentMap[string, V] { - return create[string, V](fnv32) + return create[string, V](fnv32, uint32(SHARD_COUNT)) } // Creates a new concurrent map. func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] { - return create[K, V](strfnv32[K]) + return create[K, V](strfnv32[K], uint32(SHARD_COUNT)) } // Creates a new concurrent map. func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { - return create[K, V](sharding) + return create[K, V](sharding, uint32(SHARD_COUNT)) +} + +// NewWithCustomShardingCountFunction Create a new concurrent map using the given shardCount +func NewWithCustomShardingCountFunction[K comparable, V any](sharding func(key K) uint32, shardCount uint32) ConcurrentMap[K, V] { + return create[K, V](sharding, shardCount) } // GetShard returns shard under given key func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] { - return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)] + return m.shards[uint(m.sharding(key))%uint(m.shardCount)] } func (m ConcurrentMap[K, V]) MSet(data map[K]V) { @@ -119,7 +126,7 @@ func (m ConcurrentMap[K, V]) Get(key K) (V, bool) { // Count returns the number of elements within the map. func (m ConcurrentMap[K, V]) Count() int { count := 0 - for i := 0; i < SHARD_COUNT; i++ { + for i := 0; i < int(m.shardCount); i++ { shard := m.shards[i] shard.RLock() count += len(shard.items) @@ -228,9 +235,9 @@ func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, if len(m.shards) == 0 { panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`) } - chans = make([]chan Tuple[K, V], SHARD_COUNT) + chans = make([]chan Tuple[K, V], m.shardCount) wg := sync.WaitGroup{} - wg.Add(SHARD_COUNT) + wg.Add(int(m.shardCount)) // Foreach shard. for index, shard := range m.shards { go func(index int, shard *ConcurrentMapShared[K, V]) { @@ -303,7 +310,7 @@ func (m ConcurrentMap[K, V]) Keys() []K { go func() { // Foreach shard. wg := sync.WaitGroup{} - wg.Add(SHARD_COUNT) + wg.Add(int(m.shardCount)) for _, shard := range m.shards { go func(shard *ConcurrentMapShared[K, V]) { // Foreach key, value pair.