Skip to content
This repository has been archived by the owner on Sep 28, 2022. It is now read-only.
/ amqp Public archive
forked from vcabbage/amqp

AMQP 1.0 client library for Go.

License

Notifications You must be signed in to change notification settings

nuclio/amqp

 
 

Repository files navigation

github.com/nuclio/amqp

Go Report Card Coverage Status Build Status Build status GoDoc MIT licensed

github.com/nuclio/amqp is an AMQP 1.0 client implementation for Go.

AMQP 1.0 is not compatible with AMQP 0-9-1 or 0-10, which are the most common AMQP protocols in use today. A list of AMQP 1.0 brokers and other AMQP 1.0 resources can be found at github.com/xinchen10/awesome-amqp.

This project is currently alpha status, though it is currently being used by my employer in a pre-production capacity.

API is subject to change until 1.0.0. If you choose to use this library, please vendor it.

Install

go get -u github.com/nuclio/amqp

Contributing

I'm happy to accept contributions. A proper CONTRIBUTING.md is in the works. In the interim please open an issue before beginning work so we can discuss it. I want to ensure there is no duplication of effort and that any new functionality fits with the goals of the project.

Example Usage

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/nuclio/amqp"
)

func main() {
	// Create client
	client, err := amqp.Dial("amqps://my-namespace.servicebus.windows.net",
		amqp.ConnSASLPlain("access-key-name", "access-key"),
	)
	if err != nil {
		log.Fatal("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	ctx := context.Background()

	// Send a message
	{
		// Create a sender
		sender, err := session.NewSender(
			amqp.LinkTargetAddress("/queue-name"),
		)
		if err != nil {
			log.Fatal("Creating sender link:", err)
		}

		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

		// Send message
		err = sender.Send(ctx, &amqp.Message{
			Data: []byte("Hello!"),
		})
		if err != nil {
			log.Fatal("Sending message:", err)
		}

		cancel()
		sender.Close()
	}

	// Continuously read messages
	{
		// Create a receiver
		receiver, err := session.NewReceiver(
			amqp.LinkSourceAddress("/queue-name"),
			amqp.LinkCredit(10),
		)
		if err != nil {
			log.Fatal("Creating receiver link:", err)
		}

		ctx, cancel := context.WithCancel(ctx)
		defer cancel()

		for {
			// Receive next message
			msg, err := receiver.Receive(ctx)
			if err != nil {
				log.Fatal("Reading message from AMQP:", err)
			}

			// Accept message
			msg.Accept()

			fmt.Printf("Message received: %s\n", msg.Data)
		}
	}
}

Notable Bugs/Shortcomings

  • Closing a sessions does not send an end performative.
  • Testing should be improved. Currently fuzz testing and basic Azure Service Bus integration testing is being performed.

Features - Short Term

  • Set sender filters to support Azure Event Hubs. (Supported as of 0.3.0)

Features - Medium Term

  • Support message producer operations. (Supported as of 0.2.0)

Other Notes

By default, this package depends only on the standard library. Building with the pkgerrors tag will cause errors to be created/wrapped by the github.com/pkg/errors library. This can be useful for debugging and when used in a project using github.com/pkg/errors.

Packages

No packages published

Languages

  • Go 99.7%
  • Makefile 0.3%