Skip to content

Commit

Permalink
fixup sync read
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall committed Dec 15, 2024
1 parent 753dcd6 commit 1627fc9
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions stomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,22 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew

// Process one message at a time
for {
msg := <-sub.C // Blocking read for one message
// Wait for the next message (blocks if the channel is empty)
msg, ok := <-sub.C
if !ok {
// Subscription is no longer active
return fmt.Errorf("subscription to %s is closed", queueName)
}

// Check for an empty or nil message
if msg == nil || len(msg.Body) == 0 {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
}
continue
}

// Process the message
// Process the message synchronously
handleMessage(msg, middleware)

// Acknowledge the message after successful processing
Expand All @@ -124,6 +131,7 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew
slog.Error("Failed to acknowledge message", "queue", queueName, "error", err)
}
}

}

func handleMessage(msg *stomp.Message, middleware scyllaridae.QueueMiddleware) {
Expand Down

0 comments on commit 1627fc9

Please sign in to comment.