Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instances for com.twitter.concurrent.AsyncStream #62

Merged
merged 9 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ scala:
jdk:
- oraclejdk8

sbt_args: -J-Xmx4096M

install:
- pip install --user codecov

Expand Down
47 changes: 47 additions & 0 deletions util/src/main/scala/io/catbird/util/asyncstream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.catbird
package util

import cats.{ CoflatMap, Eq, Monoid, StackSafeMonad, Semigroup }
import com.twitter.concurrent._
import com.twitter.util._

trait AsyncStreamInstances extends AsyncStreamInstances1 {

implicit final val asyncStreamInstances: StackSafeMonad[AsyncStream] with CoflatMap[AsyncStream] =
new AsyncStreamCoflatMap with StackSafeMonad[AsyncStream] {
final def pure[A](a: A): AsyncStream[A] = AsyncStream.of(a)
final def flatMap[A, B](fa: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] = fa.flatMap(f)
override final def map[A, B](fa: AsyncStream[A])(f: A => B): AsyncStream[B] = fa.map(f)
}

implicit final def asyncStreamSemigroup[A](implicit A: Semigroup[A]): Semigroup[AsyncStream[A]] =
new AsyncStreamSemigroup[A]

final def asyncStreamEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[AsyncStream[A]] = new Eq[AsyncStream[A]] {
final def eqv(x: AsyncStream[A], y: AsyncStream[A]): scala.Boolean = Await.result(
x.take(1).toSeq.join(y.take(1).toSeq).map { case (x, y) => x == y },
atMost
)
}

}

trait AsyncStreamInstances1 {

implicit final def asyncStreamMonoid[A](implicit M: Monoid[A]): Monoid[AsyncStream[A]] =
new AsyncStreamSemigroup[A] with Monoid[AsyncStream[A]] {
final def empty: AsyncStream[A] = AsyncStream(M.empty)
}
}

private[util] abstract class AsyncStreamCoflatMap extends CoflatMap[AsyncStream] {
final def coflatMap[A, B](fa: AsyncStream[A])(f: AsyncStream[A] => B): AsyncStream[B] = AsyncStream(f(fa))

}

private[util] class AsyncStreamSemigroup[A](implicit A: Semigroup[A]) extends Semigroup[AsyncStream[A]] {
final def combine(fa: AsyncStream[A], fb: AsyncStream[A]): AsyncStream[A] = fa.flatMap { a =>
fb.map( b => A.combine(a,b) )
}

}
2 changes: 1 addition & 1 deletion util/src/main/scala/io/catbird/util/package.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package io.catbird

package object util extends FutureInstances with TryInstances with VarInstances
package object util extends FutureInstances with TryInstances with VarInstances with AsyncStreamInstances
15 changes: 4 additions & 11 deletions util/src/main/scala/io/catbird/util/var.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.catbird.util

import cats.{ CoflatMap, Comonad, Eq, Monad, Monoid, Semigroup }
import cats.{ CoflatMap, Comonad, Eq, StackSafeMonad, Monoid, Semigroup }
import com.twitter.util.Var
import scala.Boolean
import scala.util.{ Either, Left, Right }

trait VarInstances extends VarInstances1 {
implicit final val twitterVarInstance: Monad[Var] with CoflatMap[Var] =
new VarCoflatMap with Monad[Var] {
implicit final val twitterVarInstance: StackSafeMonad[Var] with CoflatMap[Var] =
new VarCoflatMap with StackSafeMonad[Var] {
final def pure[A](x: A): Var[A] = Var.value(x)
final def flatMap[A, B](fa: Var[A])(f: A => Var[B]): Var[B] = fa.flatMap(f)
override final def map[A, B](fa: Var[A])(f: A => B): Var[B] = fa.map(f)
Expand Down Expand Up @@ -39,15 +38,9 @@ trait VarInstances1 {
}

private[util] abstract class VarCoflatMap extends CoflatMap[Var] {

final def coflatMap[A, B](fa: Var[A])(f: Var[A] => B): Var[B] = Var(f(fa))

/**
* Note that this implementation is not stack-safe.
*/
final def tailRecM[A, B](a: A)(f: A => Var[Either[A, B]]): Var[B] = f(a).flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the law-checking for Var is still using StackUnsafeMonad, though?

Copy link
Contributor Author

@crispywalrus crispywalrus Mar 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it still is. The easy fix of using StackSafeMonad failed and rather than leave the code seeming to be be stack safe I left it unchanged (and still commented). I didn't have the time at hand to dive into Var to understand why it's not behaving. I think I should early this week

case Left(a1) => tailRecM(a1)(f)
case Right(b) => Var.value(b)
}
}

private[util] class VarSemigroup[A](implicit A: Semigroup[A]) extends Semigroup[Var[A]] {
Expand Down
4 changes: 4 additions & 0 deletions util/src/test/scala/io/catbird/util/arbitrary.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.catbird.util

import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.time._
import com.twitter.util.{ Future, Return, Try, Var }
import org.scalacheck.{ Arbitrary, Cogen }
Expand All @@ -14,6 +15,9 @@ trait ArbitraryInstances {
implicit def varArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Var[A]] =
Arbitrary(A.arbitrary.map(Var.value))

implicit def asyncStreamArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[AsyncStream[A]] =
Arbitrary(Arbitrary.arbitrary[Stream[A]].map(AsyncStream.fromSeq))

implicit def rerunnableArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Rerunnable[A]] =
Arbitrary(futureArbitrary[A].arbitrary.map(Rerunnable.fromFuture[A](_)))

Expand Down
24 changes: 24 additions & 0 deletions util/src/test/scala/io/catbird/util/asyncstream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.catbird
package util

import cats.Eq
import cats.instances.int._
import cats.instances.tuple._
import cats.kernel.laws.discipline.{MonoidTests, SemigroupTests}
import cats.laws.discipline._
import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.time._
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline

class AsyncStreamSuite extends FunSuite with Discipline with AsyncStreamInstances with ArbitraryInstances {

implicit val eqAsyncStreamInt: Eq[AsyncStream[Int]] = asyncStreamEq(1.second)
implicit val eqAsyncStreamAsyncStreamInt: Eq[AsyncStream[AsyncStream[Int]]] = asyncStreamEq(1.second)
implicit val eqAsyncStreamIntIntInt: Eq[AsyncStream[(Int,Int,Int)]] = asyncStreamEq[(Int,Int,Int)](1.second)

checkAll("AsyncStream[Int]", MonadTests[AsyncStream].monad[Int, Int, Int])
checkAll("AsyncStream[Int]", SemigroupTests[AsyncStream[Int]](asyncStreamSemigroup[Int]).semigroup)
checkAll("AsyncStream[Int]", MonoidTests[AsyncStream[Int]].monoid)

}