diff --git a/stomp.go b/stomp.go index cfed9be..6c99046 100644 --- a/stomp.go +++ b/stomp.go @@ -107,7 +107,14 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew // Process one message at a time for { - msg := <-sub.C // Blocking read for one message + // Wait for the next message (blocks if the channel is empty) + msg, ok := <-sub.C + if !ok { + // Subscription is no longer active + return fmt.Errorf("subscription to %s is closed", queueName) + } + + // Check for an empty or nil message if msg == nil || len(msg.Body) == 0 { if !sub.Active() { return fmt.Errorf("no longer subscribed to %s", queueName) @@ -115,7 +122,7 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew continue } - // Process the message + // Process the message synchronously handleMessage(msg, middleware) // Acknowledge the message after successful processing @@ -124,6 +131,7 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew slog.Error("Failed to acknowledge message", "queue", queueName, "error", err) } } + } func handleMessage(msg *stomp.Message, middleware scyllaridae.QueueMiddleware) {