Skip to content

Commit

Permalink
Make reading off the queue synchronous (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall authored Dec 4, 2024
1 parent c386a3b commit 8df9832
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 52 deletions.
2 changes: 1 addition & 1 deletion ci/k8s/hypercube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
spec:
containers:
- name: scyllaridae-tesseract
image: lehighlts/scyllaridae-tesseract:main
image: lehighlts/scyllaridae-tesseract:main-14b2276
imagePullPolicy: IfNotPresent
resources:
requests:
Expand Down
7 changes: 5 additions & 2 deletions ci/k8s/mergepdf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ spec:
spec:
containers:
- name: scyllaridae-mergepdf
image: lehighlts/scyllaridae-mergepdf:main
image: lehighlts/scyllaridae-mergepdf:main-14b2276
imagePullPolicy: IfNotPresent
env:
- name: MAX_THREADS
value: "7"
resources:
requests:
memory: "128Mi"
cpu: "500m"
limits:
memory: "1Gi"
memory: "4Gi"
ports:
- containerPort: 8080
hostPort: 8887
Expand Down
2 changes: 1 addition & 1 deletion examples/tesseract/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ RUN --mount=type=bind,from=leptonica,source=/packages,target=/packages \
tesseract-ocr-data-deu==5.3.4-r0 \
tesseract-ocr-data-jpn==5.3.4-r0 \
tesseract-ocr-data-rus==5.3.4-r0 \
poppler-utils==24.02.0-r1
poppler-utils==24.02.0-r2

COPY scyllaridae.yml /app/scyllaridae.yml
79 changes: 31 additions & 48 deletions stomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/base64"
"fmt"
"log/slog"
"math/rand"
"net"
"net/http"
"os"
Expand All @@ -28,46 +27,32 @@ func runStompSubscribers(config *scyllaridae.ServerConfig) {
wg.Add(1)
go func(middleware scyllaridae.QueueMiddleware) {
defer wg.Done()
messageChan := make(chan *stomp.Message, middleware.Consumers)

// Start the specified number of worker goroutines
for i := 0; i < middleware.Consumers; i++ {
slog.Info("Adding consumer", "consumer", i)
go worker(messageChan, middleware)
for {
select {
case <-stopChan:
slog.Info("Stopping subscriber for queue", "queue", middleware.QueueName)
return
default:
// Process one message at a time
err := RecvAndProcessMessage(middleware.QueueName, middleware)
if err != nil {
slog.Error("Error processing message", "queue", middleware.QueueName, "error", err)
}
}
}

RecvStompMessages(middleware.QueueName, messageChan)
}(middleware)
}

// Wait for a termination signal
<-stopChan
slog.Info("Shutting down message listener")
}

func worker(messageChan <-chan *stomp.Message, middleware scyllaridae.QueueMiddleware) {
for msg := range messageChan {
handleMessage(msg, middleware)
}
}
slog.Info("Shutting down message listeners")

func RecvStompMessages(queueName string, messageChan chan<- *stomp.Message) {
attempt := 0
maxAttempts := 30
for attempt = 0; attempt < maxAttempts; attempt++ {
if err := connectAndSubscribe(queueName, messageChan); err != nil {
slog.Error("Resubscribing", "queue", queueName, "error", err)
if err := retryWithExponentialBackoff(attempt, maxAttempts); err != nil {
slog.Error("Failed subscribing after too many failed attempts", "queue", queueName, "attempts", attempt)
return
}
} else {
// Subscription was successful
break
}
}
// Wait for all subscribers to gracefully stop
wg.Wait()
}

func connectAndSubscribe(queueName string, messageChan chan<- *stomp.Message) error {
func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddleware) error {
addr := os.Getenv("STOMP_SERVER_ADDR")
if addr == "" {
addr = "activemq:61613"
Expand Down Expand Up @@ -103,8 +88,7 @@ func connectAndSubscribe(queueName string, messageChan chan<- *stomp.Message) er
slog.Error("Problem disconnecting from STOMP server", "err", err)
}
}()

sub, err := conn.Subscribe(queueName, stomp.AckAuto)
sub, err := conn.Subscribe(queueName, stomp.AckClient)
if err != nil {
slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
Expand All @@ -118,19 +102,27 @@ func connectAndSubscribe(queueName string, messageChan chan<- *stomp.Message) er
slog.Error("Problem unsubscribing", "err", err)
}
}()
slog.Info("Server subscribed to", "queue", queueName)
slog.Info("Subscribed to queue", "queue", queueName)

for msg := range sub.C {
// Process one message at a time
for {
msg := <-sub.C // Blocking read for one message
if msg == nil || len(msg.Body) == 0 {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
}
continue
}
messageChan <- msg // Send the message to the channel
}

return nil
// Process the message
handleMessage(msg, middleware)

// Acknowledge the message after successful processing
err := msg.Conn.Ack(msg)
if err != nil {
slog.Error("Failed to acknowledge message", "queue", queueName, "error", err)
}
}
}

func handleMessage(msg *stomp.Message, middleware scyllaridae.QueueMiddleware) {
Expand Down Expand Up @@ -195,12 +187,3 @@ func handleMessage(msg *stomp.Message, middleware scyllaridae.QueueMiddleware) {
slog.Info("Successfully PUT data to", "url", islandoraMessage.Attachment.Content.DestinationURI, "status", putResp.StatusCode)
}
}

func retryWithExponentialBackoff(attempt int, maxAttempts int) error {
if attempt >= maxAttempts {
return fmt.Errorf("maximum retry attempts reached")
}
wait := time.Duration(rand.Intn(1<<attempt)) * time.Second
time.Sleep(wait)
return nil
}

0 comments on commit 8df9832

Please sign in to comment.