github.com/nuclio/amqp is an AMQP 1.0 client implementation for Go.
AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today. A list of AMQP 1.0 brokers and other AMQP 1.0 resources can be found at github.com/xinchen10/awesome-amqp.
This project is currently alpha status, though it is currently being used by my employer in a pre-production capacity.
API is subject to change until 1.0.0. If you choose to use this library, please vendor it.
go get -u github.com/nuclio/amqp
I'm happy to accept contributions. A proper CONTRIBUTING.md
is in the works. In the interim please open an issue before beginning work so we can discuss it. I want to ensure there is no duplication of effort and that any new functionality fits with the goals of the project.
package main
import (
"context"
"fmt"
"log"
"github.com/nuclio/amqp"
)
func main() {
// Create client
client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
amqp.ConnSASLPlain("access-key-name", "access-key"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Send a message
{
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress("/queue-name"),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
err = sender.Send(ctx, &amqp.Message{
Data: []byte("Hello!"),
})
if err != nil {
log.Fatal("Sending message:", err)
}
cancel()
sender.Close()
}
// Continuously read messages
{
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept()
fmt.Printf("Message received: %s\n", msg.Data)
}
}
}
- Closing a sessions does not send an end performative.
- Testing should be improved. Currently fuzz testing and basic Azure Service Bus integration testing is being performed.
- Set sender filters to support Azure Event Hubs. (Supported as of 0.3.0)
- Support message producer operations. (Supported as of 0.2.0)
By default, this package depends only on the standard library. Building with the
pkgerrors
tag will cause errors to be created/wrapped by the github.com/pkg/errors
library. This can be useful for debugging and when used in a project using
github.com/pkg/errors.