Skip to content

Commit

Permalink
Refactor Queue to use interfaces and inject into server
Browse files Browse the repository at this point in the history
  • Loading branch information
lazyguru committed Feb 6, 2024
1 parent 7b78045 commit 444c437
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 33 deletions.
17 changes: 7 additions & 10 deletions cmd/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,25 @@ func main() {
defer conn.Close()
conn.RunMigrations()

mqConnection, err := queue.Connect()
q := queue.NewQueue(logger)
err = q.Connect()
if err != nil {
logger.Fatal("failed connecting to queue service", err)
}
defer mqConnection.Close()
producer, err := queue.CreateProducer(mqConnection, "backend")
defer q.Close()
err = q.CreateProducer("backend")
if err != nil {
logger.Fatal("failed creating producer", err)
}
defer producer.Close()
messages, err := queue.CreateConsumer(mqConnection, "federation")
err = q.CreateConsumer("federation")
if err != nil {
logger.Fatal("failed creating consumer", err)
}
go func() {
for message := range messages {
logger.Debug(fmt.Sprintf(" > Received message: %s\n", message.Body))
}
}()
q.StartConsumer("federation")
config := http.ServerConfig{
Logger: logger,
Database: conn,
Queue: q,
}
s := http.NewServer(config)
s.RunServer()
Expand Down
4 changes: 4 additions & 0 deletions internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"sublinks/sublinks-federation/internal/db"
"sublinks/sublinks-federation/internal/log"
"sublinks/sublinks-federation/internal/queue"
"time"

"github.com/gorilla/mux"
Expand All @@ -17,11 +18,13 @@ type Server struct {
*mux.Router
log.Logger
*db.Database
*queue.Queue
}

type ServerConfig struct {
log.Logger
*db.Database
*queue.Queue
}

func NewServer(config ServerConfig) *Server {
Expand All @@ -31,6 +34,7 @@ func NewServer(config ServerConfig) *Server {
Router: r,
Logger: config.Logger,
Database: config.Database,
Queue: config.Queue,
}
}

Expand Down
31 changes: 20 additions & 11 deletions internal/queue/consumer.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package queue

import (
amqp "github.com/rabbitmq/amqp091-go"
)

func CreateConsumer(q *amqp.Connection, queueName string) (<-chan amqp.Delivery, error) {
channelRabbitMQ, err := q.Channel()
func (q *Queue) CreateConsumer(queueName string) error {
channelRabbitMQ, err := q.Connection.Channel()
if err != nil {
return nil, err
return err
}
err = CreateQueue(channelRabbitMQ, queueName)
err = q.CreateQueue(channelRabbitMQ, queueName)
if err != nil {
return nil, err
return err
}
// Subscribing to QueueService1 for getting messages.
messages, err := channelRabbitMQ.Consume(
Expand All @@ -24,7 +20,20 @@ func CreateConsumer(q *amqp.Connection, queueName string) (<-chan amqp.Delivery,
nil, // arguments
)
if err != nil {
return nil, err
return err
}
q.Consumers[queueName] = messages
return nil
}

func (q *Queue) StartConsumer(queueName string) {
messages, ok := q.Consumers[queueName]
if !ok {
return
}
return messages, nil
go func() {
for message := range messages {
q.Logger.Printf(" > Received message: %s\n", message.Body)
}
}()
}
22 changes: 14 additions & 8 deletions internal/queue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

func CreateProducer(q *amqp.Connection, queueName string) (*amqp.Channel, error) {
channelRabbitMQ, err := q.Channel()
type Publisher struct {
QueueName string
*amqp.Channel
}

func (q *Queue) CreateProducer(queueName string) error {
channelRabbitMQ, err := q.Connection.Channel()
if err != nil {
return nil, err
return err
}
err = CreateQueue(channelRabbitMQ, queueName)
err = q.CreateQueue(channelRabbitMQ, queueName)
if err != nil {
return nil, err
return err
}
return channelRabbitMQ, nil
q.Publishers[queueName] = &Publisher{queueName, channelRabbitMQ}
return nil
}

func PublishMessage(q *amqp.Channel, message string) error {
return q.PublishWithContext(
func (p *Publisher) PublishMessage(message string) error {
return p.Channel.PublishWithContext(
context.TODO(),
"backend", // exchange
"", // routing key
Expand Down
29 changes: 25 additions & 4 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,35 @@ package queue
import (
"os"

"sublinks/sublinks-federation/internal/log"

amqp "github.com/rabbitmq/amqp091-go"
)

func Connect() (*amqp.Connection, error) {
type Queue struct {
*amqp.Connection
Publishers map[string]*Publisher
Consumers map[string]<-chan amqp.Delivery
Logger *log.Log
}

func NewQueue(logger *log.Log) *Queue {
return &Queue{Logger: logger}
}

func (q *Queue) Connect() error {
// Get the connection string from the environment variable
amqpServerURL := os.Getenv("AMQP_SERVER_URL")
// Create a new RabbitMQ connection.
connectRabbitMQ, err := amqp.Dial(amqpServerURL)
if err != nil {
return nil, err
return err
}
return connectRabbitMQ, nil
q.Connection = connectRabbitMQ
return nil
}

func CreateQueue(channelRabbitMQ *amqp.Channel, queueName string) error {
func (q *Queue) CreateQueue(channelRabbitMQ *amqp.Channel, queueName string) error {
// With the instance and declare Queues that we can
// publish and subscribe to.
_, err := channelRabbitMQ.QueueDeclare(
Expand All @@ -30,3 +44,10 @@ func CreateQueue(channelRabbitMQ *amqp.Channel, queueName string) error {
)
return err
}

func (q *Queue) Close() {
for _, publisher := range q.Publishers {
publisher.Close()
}
q.Connection.Close()
}

0 comments on commit 444c437

Please sign in to comment.