Skip to content

Commit

Permalink
add switch for routing
Browse files Browse the repository at this point in the history
  • Loading branch information
devanbenz committed Mar 4, 2024
1 parent 3b112f7 commit d4e73d1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
8 changes: 7 additions & 1 deletion cmd/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ func main() {
defer conn.Close()
conn.RunMigrations()

q := queue.NewQueue(logger)
routingKeys := []string{
"actor.created",
}
q := queue.NewQueue(logger, routingKeys, "federation")
err = q.Connect()
if err != nil {
logger.Fatal("failed connecting to queue service", err)
}
defer q.Close()

err = q.StartConsumer("federation")
if err != nil {
logger.Fatal("failed starting to consumer", err)
}

config := http.ServerConfig{
Logger: logger,
Database: conn,
Queue: q,
}

s := http.NewServer(config)
s.RunServer()
}
2 changes: 1 addition & 1 deletion internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (server *Server) RunServer() {
server.Router.Use(server.logMiddleware)

srv := &http.Server{
Addr: "0.0.0.0:8080",
Addr: "0.0.0.0:3000",
// Good practice to set timeouts to avoid Slowloris attacks.
WriteTimeout: time.Second * 15,
ReadTimeout: time.Second * 15,
Expand Down
30 changes: 18 additions & 12 deletions internal/queue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package queue

import (
"errors"
"fmt"
)

func (q *RabbitQueue) createConsumer(queueName string, exchangeName string) error {
func (q *RabbitQueue) createConsumer(queueName string) error {
channelRabbitMQ, err := q.Connection.Channel()
if err != nil {
return err
}

err = channelRabbitMQ.ExchangeDeclare(
exchangeName,
"topic",
q.exchangeName,
"direct",
true,
false,
false,
Expand All @@ -23,13 +24,13 @@ func (q *RabbitQueue) createConsumer(queueName string, exchangeName string) erro
return err
}

for _, key := range q.routingKeys {
err = q.createQueue(channelRabbitMQ, queueName)
if err != nil {
return err
}
err = q.createQueue(channelRabbitMQ, queueName)
if err != nil {
return err
}

err = channelRabbitMQ.QueueBind(queueName, key, exchangeName, false, nil)
for _, key := range q.routingKeys {
err = channelRabbitMQ.QueueBind(queueName, key, q.exchangeName, false, nil)
if err != nil {
return err
}
Expand All @@ -55,8 +56,8 @@ func (q *RabbitQueue) createConsumer(queueName string, exchangeName string) erro
}

// TODO: Implement a way to either pass a callback function or return messages/chan
func (q *RabbitQueue) StartConsumer(queueName string, exchangeName string) error {
err := q.createConsumer(queueName, exchangeName)
func (q *RabbitQueue) StartConsumer(queueName string) error {
err := q.createConsumer(queueName)
if err != nil {
return err
}
Expand All @@ -68,7 +69,12 @@ func (q *RabbitQueue) StartConsumer(queueName string, exchangeName string) error

go func() {
for message := range messages {
q.logger.Printf(" > Received message: %s\n", message.Body)
switch message.RoutingKey {
case "actor.created":
q.logger.Printf(" > Received message: %s\n", message.Body)
default:
q.logger.Warn(fmt.Sprintf("%s is not a valid routing key", message.RoutingKey))
}
}
}()

Expand Down
20 changes: 11 additions & 9 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ type Queue interface {

type RabbitQueue struct {
*amqp.Connection
publishers map[string]*publisher
consumers map[string]<-chan amqp.Delivery
routingKeys []string
logger *log.Log
exchangeName string
publishers map[string]*publisher
consumers map[string]<-chan amqp.Delivery
routingKeys []string
logger *log.Log
}

func NewQueue(logger *log.Log, routingKeys []string) Queue {
func NewQueue(logger *log.Log, routingKeys []string, exchangeName string) Queue {
return &RabbitQueue{
logger: logger,
publishers: make(map[string]*publisher),
consumers: make(map[string]<-chan amqp.Delivery),
routingKeys: routingKeys,
logger: logger,
publishers: make(map[string]*publisher),
consumers: make(map[string]<-chan amqp.Delivery),
exchangeName: exchangeName,
routingKeys: routingKeys,
}
}

Expand Down

0 comments on commit d4e73d1

Please sign in to comment.