diff --git a/stomp.go b/stomp.go index 6c99046..8350f31 100644 --- a/stomp.go +++ b/stomp.go @@ -56,7 +56,7 @@ func runStompSubscribers(config *scyllaridae.ServerConfig) { func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddleware) error { addr := os.Getenv("STOMP_SERVER_ADDR") if addr == "" { - addr = "activemq:61613" + addr = "activemq:61613?activemq.prefetchSize=1" } c, err := net.Dial("tcp", addr) @@ -78,7 +78,7 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew return err } - conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 0*time.Second)) + conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 10*time.Second)) if err != nil { slog.Error("Cannot connect to STOMP server", "err", err.Error()) return err @@ -89,7 +89,7 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew slog.Error("Problem disconnecting from STOMP server", "err", err) } }() - sub, err := conn.Subscribe(queueName, stomp.AckClient) + sub, err := conn.Subscribe(queueName, stomp.AckClient, stomp.SubscribeOpt.Prefetch(1)) if err != nil { slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error()) return err