diff --git a/cmd/federation.go b/cmd/federation.go index 1a43db7..c651143 100644 --- a/cmd/federation.go +++ b/cmd/federation.go @@ -28,21 +28,27 @@ func main() { defer conn.Close() conn.RunMigrations() - q := queue.NewQueue(logger) + routingKeys := []string{ + "actor.created", + } + q := queue.NewQueue(logger, routingKeys, "federation") err = q.Connect() if err != nil { logger.Fatal("failed connecting to queue service", err) } defer q.Close() + err = q.StartConsumer("federation") if err != nil { logger.Fatal("failed starting to consumer", err) } + config := http.ServerConfig{ Logger: logger, Database: conn, Queue: q, } + s := http.NewServer(config) s.RunServer() } diff --git a/internal/http/server.go b/internal/http/server.go index 69ad3ef..3aaed52 100644 --- a/internal/http/server.go +++ b/internal/http/server.go @@ -52,7 +52,7 @@ func (server *Server) RunServer() { server.Router.Use(server.logMiddleware) srv := &http.Server{ - Addr: "0.0.0.0:8080", + Addr: "0.0.0.0:3000", // Good practice to set timeouts to avoid Slowloris attacks. WriteTimeout: time.Second * 15, ReadTimeout: time.Second * 15, diff --git a/internal/queue/consumer.go b/internal/queue/consumer.go index 43eed2e..5959d81 100644 --- a/internal/queue/consumer.go +++ b/internal/queue/consumer.go @@ -2,17 +2,18 @@ package queue import ( "errors" + "fmt" ) -func (q *RabbitQueue) createConsumer(queueName string, exchangeName string) error { +func (q *RabbitQueue) createConsumer(queueName string) error { channelRabbitMQ, err := q.Connection.Channel() if err != nil { return err } err = channelRabbitMQ.ExchangeDeclare( - exchangeName, - "topic", + q.exchangeName, + "direct", true, false, false, @@ -23,13 +24,13 @@ func (q *RabbitQueue) createConsumer(queueName string, exchangeName string) erro return err } - for _, key := range q.routingKeys { - err = q.createQueue(channelRabbitMQ, queueName) - if err != nil { - return err - } + err = q.createQueue(channelRabbitMQ, queueName) + if err != nil { + return err + } - err = channelRabbitMQ.QueueBind(queueName, key, exchangeName, false, nil) + for _, key := range q.routingKeys { + err = channelRabbitMQ.QueueBind(queueName, key, q.exchangeName, false, nil) if err != nil { return err } @@ -55,8 +56,8 @@ func (q *RabbitQueue) createConsumer(queueName string, exchangeName string) erro } // TODO: Implement a way to either pass a callback function or return messages/chan -func (q *RabbitQueue) StartConsumer(queueName string, exchangeName string) error { - err := q.createConsumer(queueName, exchangeName) +func (q *RabbitQueue) StartConsumer(queueName string) error { + err := q.createConsumer(queueName) if err != nil { return err } @@ -68,7 +69,12 @@ func (q *RabbitQueue) StartConsumer(queueName string, exchangeName string) error go func() { for message := range messages { - q.logger.Printf(" > Received message: %s\n", message.Body) + switch message.RoutingKey { + case "actor.created": + q.logger.Printf(" > Received message: %s\n", message.Body) + default: + q.logger.Warn(fmt.Sprintf("%s is not a valid routing key", message.RoutingKey)) + } } }() diff --git a/internal/queue/queue.go b/internal/queue/queue.go index e6d32cc..340cb82 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -17,18 +17,20 @@ type Queue interface { type RabbitQueue struct { *amqp.Connection - publishers map[string]*publisher - consumers map[string]<-chan amqp.Delivery - routingKeys []string - logger *log.Log + exchangeName string + publishers map[string]*publisher + consumers map[string]<-chan amqp.Delivery + routingKeys []string + logger *log.Log } -func NewQueue(logger *log.Log, routingKeys []string) Queue { +func NewQueue(logger *log.Log, routingKeys []string, exchangeName string) Queue { return &RabbitQueue{ - logger: logger, - publishers: make(map[string]*publisher), - consumers: make(map[string]<-chan amqp.Delivery), - routingKeys: routingKeys, + logger: logger, + publishers: make(map[string]*publisher), + consumers: make(map[string]<-chan amqp.Delivery), + exchangeName: exchangeName, + routingKeys: routingKeys, } }