Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue timeout #2455

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/dequeuer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
flag.StringVar(&clusterUID, "cluster-uid", "", "cluster unique identifier")
flag.StringVar(&probesPath, "probes-path", "", "path to the probes spec")
flag.StringVar(&queueURL, "queue", "", "target queue URL from which the api messages will be dequeued")
// TODO add queue_timeout_seconds
flag.StringVar(&apiKind, "api-kind", "", fmt.Sprintf("api kind (%s|%s)", userconfig.BatchAPIKind.String(), userconfig.AsyncAPIKind.String()))
flag.StringVar(&apiName, "api-name", "", "api name")
flag.StringVar(&jobID, "job-id", "", "job ID")
Expand Down
1 change: 1 addition & 0 deletions docs/workloads/async/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
pod: # pod configuration (required)
port: <int> # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT)
max_concurrency: <int> # maximum number of requests that will be concurrently sent into the container (default: 1, max allowed: 100)
queue_timeout_seconds: <int> # maximum amount of time a request can be queued before beginning to be processed
containers: # configurations for the containers to run (at least one constainer must be provided)
- name: <string> # name of the container (required)
image: <string> # docker image to use for the container (required)
Expand Down
1 change: 1 addition & 0 deletions docs/workloads/realtime/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
port: <int> # port to which requests will be sent (default: 8080; exported as $CORTEX_PORT)
max_concurrency: <int> # maximum number of requests that will be concurrently sent into the container (default: 1)
max_queue_length: <int> # maximum number of requests per replica which will be queued (beyond max_concurrency) before requests are rejected with error code 503 (default: 100)
queue_timeout_seconds: <int> # maximum amount of time a request can be queued before beginning to be processed
containers: # configurations for the containers to run (at least one constainer must be provided)
- name: <string> # name of the container (required)
image: <string> # docker image to use for the container (required)
Expand Down
18 changes: 16 additions & 2 deletions pkg/dequeuer/async_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sqs"
awslib "github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/debug"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
"github.com/cortexlabs/cortex/pkg/types/async"
"go.uber.org/zap"
Expand Down Expand Up @@ -75,15 +78,26 @@ func (h *AsyncMessageHandler) Handle(message *sqs.Message) error {
return errors.ErrorUnexpected("got unexpected sqs message with empty or nil body")
}

var msgSentTime *time.Time
if msgSentTimestamp, ok := message.Attributes[sqs.MessageSystemAttributeNameSentTimestamp]; ok {
if msgSentTimestamp != nil {
if parsed, ok := s.ParseInt64(*msgSentTimestamp); ok {
msgSentTime = pointer.Time(time.UnixMilli(parsed))
}
}
}

requestID := *message.Body
err := h.handleMessage(requestID)
err := h.handleMessage(requestID, msgSentTime)
if err != nil {
return err
}
return nil
}

func (h *AsyncMessageHandler) handleMessage(requestID string) error {
func (h *AsyncMessageHandler) handleMessage(requestID string, msgSentTime *time.Time) error {
debug.Ppg(msgSentTime)

h.log.Infow("processing workload", "id", requestID)

err := h.updateStatus(requestID, async.StatusInProgress)
Expand Down
1 change: 1 addition & 0 deletions pkg/dequeuer/dequeuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (d *SQSDequeuer) ReceiveMessage() (*sqs.Message, error) {
MessageAttributeNames: aws.StringSlice(_messageAttributes),
VisibilityTimeout: d.visibilityTimeout,
WaitTimeSeconds: d.waitTimeSeconds,
AttributeNames: aws.StringSlice([]string{sqs.MessageSystemAttributeNameSentTimestamp}),
})

if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/lib/k8s/virtual_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type VirtualServiceSpec struct {
Labels map[string]string
Annotations map[string]string
Headers *istionetworking.Headers
Retries *int32
}

type Destination struct {
Expand Down Expand Up @@ -153,6 +154,14 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
httpRoutes = append(httpRoutes, exactMatch, prefixMatch)
}

if spec.Retries != nil {
for i := range httpRoutes {
httpRoutes[i].Retries = &istionetworking.HTTPRetry{
Attempts: *spec.Retries,
}
}
}

virtualService := &istioclientnetworking.VirtualService{
TypeMeta: _virtualServiceTypeMeta,
ObjectMeta: kmeta.ObjectMeta{
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/resources/realtimeapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService {
},
PrefixPath: api.Networking.Endpoint,
Rewrite: pointer.String("/"),
Retries: pointer.Int32(0),
Annotations: api.ToK8sAnnotations(),
Labels: map[string]string{
"apiName": api.Name,
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/resources/trafficsplitter/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func virtualServiceSpec(trafficSplitter *spec.API) *istioclientnetworking.Virtua
Destinations: getTrafficSplitterDestinations(trafficSplitter),
ExactPath: trafficSplitter.Networking.Endpoint,
Rewrite: pointer.String("/"),
Retries: pointer.Int32(0),
Annotations: trafficSplitter.ToK8sAnnotations(),
Labels: map[string]string{
"apiName": trafficSplitter.Name,
Expand Down
37 changes: 36 additions & 1 deletion pkg/proxy/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (
"context"
"errors"
"fmt"
"time"

"go.uber.org/atomic"
)

var (
// ErrRequestQueueFull indicates the breaker queue depth was exceeded.
ErrRequestQueueFull = errors.New("pending request queue full")
ErrQueueTimeout = errors.New("queue timeout")
)

// BreakerParams defines the parameters of the breaker.
Expand Down Expand Up @@ -139,7 +141,8 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) error {
defer b.releasePending()

// Wait for capacity in the active queue.
if err := b.sem.acquire(ctx); err != nil {
// TODO use the actual timeout
if err := b.sem.acquireWithTimeout(ctx, 0); err != nil {
return err
}
// Defer releasing capacity in the active.
Expand Down Expand Up @@ -239,6 +242,38 @@ func (s *semaphore) acquire(ctx context.Context) error {
}
}

// acquireWithTimeout acquires capacity from the semaphore, with a timeout.
// if timeout <= 0, no timeout is used
func (s *semaphore) acquireWithTimeout(ctx context.Context, timeout time.Duration) error {
if timeout <= 0 {
return s.acquire(ctx)
}

timer := time.NewTimer(timeout)

for {
old := s.state.Load()
capacity, in := unpack(old)

if in >= capacity {
select {
case <-timer.C:
return ErrQueueTimeout
case <-ctx.Done():
return ctx.Err()
case <-s.queue:
}
// Force reload state.
continue
}

in++
if s.state.CAS(old, pack(capacity, in)) {
return nil
}
}
}

// release releases capacity in the semaphore.
// If the semaphore capacity was reduced in between and as a result inFlight is greater
// than capacity, we don't wake up goroutines as they'd not get any capacity anyway.
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Handler(breaker *Breaker, next http.Handler) http.HandlerFunc {
if err := breaker.Maybe(r.Context(), func() {
next.ServeHTTP(w, r)
}); err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrRequestQueueFull) {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrRequestQueueFull) || errors.Is(err, ErrQueueTimeout) {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusInternalServerError)
Expand Down