Skip to content

Latest commit

 

History

History
48 lines (33 loc) · 1.66 KB

README.md

File metadata and controls

48 lines (33 loc) · 1.66 KB

akka-streams-utils Build Status

Here, we can find some Akka Streams components Powerspace is using across its projects.

AckConsumableAkkaSource

It's a generic source that pulls a source and can ack the messages in the background according to certain thresholds (time and count).

Example:

val source = Source.fromGraph(new AckConsumableAkkaSource(consumer,
                                                          ackMaxSize = 1000,
                                                          ackPeriod = 2 seconds))
source.map(_ + 1).runForeach(println)

Every 1000 items or 2 seconds (if less than 1000 items), the consumer will be acked.

The consumer has to respect a given interface AckConsumableStorage defined as:

trait AckConsumableStorage[T] extends EventsStorage[T] with ConsumableStorage[T] with AckableStorage[T]

// any message should must a distinct key to be ackable
case class KeyedMessage[T](key: String, data: T)

trait EventsStorage[T] {
  def name: String
}

trait ConsumableStorage[T] { self: EventsStorage[T] =>
  def consumeAsBytes(): Future[List[KeyedMessage[Array[Byte]]]]
  def consumeAsEvents(): Future[List[KeyedMessage[T]]]
}

trait AckableStorage[T] { self: EventsStorage[T] =>
  def ack(ids: Seq[String]): Future[Unit]
}

It's particularly useful when we are dealing with Google PubSub for instance where we need to ack every messages in batch.

Note: the consumeAsBytes may not be necessary and may be removed in the future.

TODO

  • Add the publish piece into build.sbt
  • Add PubSub consumer