Skip to content

Commit

Permalink
refactor out semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi committed Sep 13, 2023
1 parent e9c8eb5 commit d551711
Showing 1 changed file with 50 additions and 14 deletions.
64 changes: 50 additions & 14 deletions pkg/clients/cloudwatch/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ type ConcurrencyConfig struct {
GetMetricStatistics int
}

// semaphore implements a simple semaphore using a channel.
type semaphore chan struct{}

// newSemaphore creates a new semaphore with the given limit.
func newSemaphore(limit int) semaphore {
return make(semaphore, limit)
}

func (s semaphore) Acquire() {
s <- struct{}{}
}

func (s semaphore) Release() {
<-s
}

// NewLimiter creates a new ConcurrencyLimiter, according to the ConcurrencyConfig.
func (cfg ConcurrencyConfig) NewLimiter() ConcurrencyLimiter {
if cfg.PerAPIEnabled {
Expand All @@ -31,39 +47,59 @@ func (cfg ConcurrencyConfig) NewLimiter() ConcurrencyLimiter {
// perAPICallLimiter is a ConcurrencyLimiter that keeps a different concurrency limiter per different API call. This allows
// a more granular control of concurrency, allowing us to take advantage of different api limits. For example, ListMetrics
// has a limit of 25 TPS, while GetMetricData has none.
type perAPICallLimiter map[string]chan struct{}
type perAPICallLimiter struct {
listMetricsLimiter semaphore
getMetricsDataLimiter semaphore
getMetricsStatisticsLimiter semaphore
}

// NewPerAPICallLimiter creates a new PerAPICallLimiter.
func NewPerAPICallLimiter(listMetrics, getMetricData, getMetricStatistics int) ConcurrencyLimiter {
return perAPICallLimiter(map[string]chan struct{}{
listMetricsCall: make(chan struct{}, listMetrics),
getMetricDataCall: make(chan struct{}, getMetricData),
getMetricStatisticsCall: make(chan struct{}, getMetricStatistics),
})
return &perAPICallLimiter{
listMetricsLimiter: newSemaphore(listMetrics),
getMetricsDataLimiter: newSemaphore(getMetricData),
getMetricsStatisticsLimiter: newSemaphore(getMetricStatistics),
}
}

func (l perAPICallLimiter) Acquire(op string) {
l[op] <- struct{}{}
func (l *perAPICallLimiter) Acquire(op string) {
switch op {
case listMetricsCall:
l.listMetricsLimiter.Acquire()
case getMetricDataCall:
l.getMetricsDataLimiter.Acquire()
case getMetricStatisticsCall:
l.getMetricsStatisticsLimiter.Acquire()
}
}

func (l perAPICallLimiter) Release(op string) {
<-l[op]
func (l *perAPICallLimiter) Release(op string) {
switch op {
case listMetricsCall:
l.listMetricsLimiter.Release()
case getMetricDataCall:
l.getMetricsDataLimiter.Release()
case getMetricStatisticsCall:
l.getMetricsStatisticsLimiter.Release()
}
}

// singleLimiter is the current implementation of ConcurrencyLimiter, which has a single limit for all different API calls.
type singleLimiter struct {
sem chan struct{}
s semaphore
}

// NewSingleLimiter creates a new SingleLimiter.
func NewSingleLimiter(limit int) ConcurrencyLimiter {
return &singleLimiter{sem: make(chan struct{}, limit)}
return &singleLimiter{
s: newSemaphore(limit),
}
}

func (sl *singleLimiter) Acquire(_ string) {
sl.sem <- struct{}{}
sl.s.Acquire()
}

func (sl *singleLimiter) Release(_ string) {
<-sl.sem
sl.s.Release()
}

0 comments on commit d551711

Please sign in to comment.