Abstraction of various (cloud) pub/sub systems.
A quick glance:
interface Publisher<Message> {
function publish(message:Message):Promise<Noise>;
}
interface Subscriber<Message> {
function subscribe(handler:Envelope<T>->Void):Subscription;
}
interface Envelope<Message> {
final content:Outcome<Message, Error>;
function ack():Void;
function nack():Void;
}
Define two interfaces containing all the publishers and subscribers needed.
Then, pick an implementation class and use the interfaces as type parameters, wrapped by why.PubSub
.
Example:
var amqp = new why.pubsub.amqp.Amqp<MyPubSub>(...);
var pubsub:MyPubSub = amqp; // ok, the instance will implement the specified interface
typedef MyPubSub = why.PubSub<Publishers, Subscribers>;
interface Publishers {
@:pubsub({serialize: v -> haxe.Serializer.run(v)})
@:pubsub.amqp({exchange: 'foo', routingKey: ''})
var foo(get, never):Publisher<{foo:Int, bar:String}>;
@:pubsub({serialize: v -> haxe.Serializer.run(v)})
@:pubsub.amqp({exchange: 'cache', routingKey: 'cache.$id'})
@:pubsub.cache({key: id + foo + bar})
function cache(id:String, foo:Int, bar:Bool):Publisher<{foo:Int, bar:String}>;
}
interface Subscribers {
@:pubsub({unserialize: v -> tink.core.Error.catchExceptions(haxe.Unserializer.run.bind(v))})
@:pubsub.amqp({queue: 'bar', prefetch: 2})
var bar(get, never):Subscriber<{foo:Int, bar:String}>;
@:pubsub({unserialize: v -> tink.core.Error.catchExceptions(haxe.Unserializer.run.bind(v))})
@:pubsub.amqp({queue: 'cache_$id', prefetch: 2})
@:pubsub.cache({key: id + foo + bar})
function cache(id:String, foo:Int, bar:Bool):Subscriber<{foo:Int, bar:String}>;
}
Node.js only. Based on the npm package amqplib
for the AMQP 0-9-1 protocol. Compatible to RabbitMQ.
Use the .sync({...})
function to set up the exchanges and queues in the broker.
A simple in-memory message queue. Mainly for local testing.
@:pubsub
on Publisher: (required for all implementations)
{
// serialize message into binary
final serialize:T->Chunk;
}
@:pubsub
on Subscriber: (required for all implementations)
{
// unserialize binary into message
final unserialize:Chunk->Outcome<T, Error>;
}
@:pubsub.cache
on Publisher & Subscriber: (optional for all implementations)
Only applicable to functions. If not specified, every time the function gets called, a new instance will be returned. Otherwise, the returned value will be cached by the key specified.
{
// cache key. function arguments are accessible here
final key:String;
}
@:pubsub.local
on Publisher: (required for Local
)
{
// publish messages to the specified queue name(s)
final to:EitherType<String, Array<String>>;
}
@:pubsub.local
on Subscriber: (required for Local
)
{
// subscribe to messages in the specified queue name
final to:String;
}
@:pubsub.amqp
on Publisher: (required for Amqp
)
{
// publish messages to this exchange
final exchange:String;
// publish messages with this routing key
final routingKey:String;
}
@:pubsub.amqp
on Subscriber: (required for Amqp
)
{
// subscribe to messages in this queue
final queue:String;
// max "in-flight" messages for this subscription (see amqp doc for more info)
final prefetch:Int;
}
- GCP PubSub
- AWS SQS