-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Instances for com.twitter.concurrent.AsyncStream #62
Merged
Merged
Changes from 6 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
f3033c5
add some instances for com.twitter.concurrent.AsyncStream
crispywalrus f893681
now with a stack safe monad implementation
crispywalrus 1512a38
add Monoid instance
crispywalrus 4bef425
use a better arb constructor than of
crispywalrus 211f755
use fromSeq in arbitrary
crispywalrus 4567880
Update .travis.yml
crispywalrus 2cf6862
Update .travis.yml
crispywalrus 2296d1d
return stack unsafe version of tailRecM
2748bf1
remove instance for CoflatMap
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ scala: | |
jdk: | ||
- oraclejdk8 | ||
|
||
sbt_args: -J-Xmx4096M | ||
|
||
install: | ||
- pip install --user codecov | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package io.catbird | ||
package util | ||
|
||
import cats.{ CoflatMap, Eq, Monoid, StackSafeMonad, Semigroup } | ||
import com.twitter.concurrent._ | ||
import com.twitter.util._ | ||
|
||
trait AsyncStreamInstances extends AsyncStreamInstances1 { | ||
|
||
implicit final val asyncStreamInstances: StackSafeMonad[AsyncStream] with CoflatMap[AsyncStream] = | ||
new AsyncStreamCoflatMap with StackSafeMonad[AsyncStream] { | ||
final def pure[A](a: A): AsyncStream[A] = AsyncStream.of(a) | ||
final def flatMap[A, B](fa: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] = fa.flatMap(f) | ||
override final def map[A, B](fa: AsyncStream[A])(f: A => B): AsyncStream[B] = fa.map(f) | ||
} | ||
|
||
implicit final def asyncStreamSemigroup[A](implicit A: Semigroup[A]): Semigroup[AsyncStream[A]] = | ||
new AsyncStreamSemigroup[A] | ||
|
||
final def asyncStreamEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[AsyncStream[A]] = new Eq[AsyncStream[A]] { | ||
final def eqv(x: AsyncStream[A], y: AsyncStream[A]): scala.Boolean = Await.result( | ||
x.take(1).toSeq.join(y.take(1).toSeq).map { case (x, y) => x == y }, | ||
atMost | ||
) | ||
} | ||
|
||
} | ||
|
||
trait AsyncStreamInstances1 { | ||
|
||
implicit final def asyncStreamMonoid[A](implicit M: Monoid[A]): Monoid[AsyncStream[A]] = | ||
new AsyncStreamSemigroup[A] with Monoid[AsyncStream[A]] { | ||
final def empty: AsyncStream[A] = AsyncStream(M.empty) | ||
} | ||
} | ||
|
||
private[util] abstract class AsyncStreamCoflatMap extends CoflatMap[AsyncStream] { | ||
final def coflatMap[A, B](fa: AsyncStream[A])(f: AsyncStream[A] => B): AsyncStream[B] = AsyncStream(f(fa)) | ||
|
||
} | ||
|
||
private[util] class AsyncStreamSemigroup[A](implicit A: Semigroup[A]) extends Semigroup[AsyncStream[A]] { | ||
final def combine(fa: AsyncStream[A], fb: AsyncStream[A]): AsyncStream[A] = fa.flatMap { a => | ||
fb.map( b => A.combine(a,b) ) | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
package io.catbird | ||
|
||
package object util extends FutureInstances with TryInstances with VarInstances | ||
package object util extends FutureInstances with TryInstances with VarInstances with AsyncStreamInstances |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package io.catbird | ||
package util | ||
|
||
import cats.Eq | ||
import cats.instances.int._ | ||
import cats.instances.tuple._ | ||
import cats.kernel.laws.discipline.{MonoidTests, SemigroupTests} | ||
import cats.laws.discipline._ | ||
import com.twitter.concurrent.AsyncStream | ||
import com.twitter.conversions.time._ | ||
import org.scalatest.FunSuite | ||
import org.typelevel.discipline.scalatest.Discipline | ||
|
||
class AsyncStreamSuite extends FunSuite with Discipline with AsyncStreamInstances with ArbitraryInstances { | ||
|
||
implicit val eqAsyncStreamInt: Eq[AsyncStream[Int]] = asyncStreamEq(1.second) | ||
implicit val eqAsyncStreamAsyncStreamInt: Eq[AsyncStream[AsyncStream[Int]]] = asyncStreamEq(1.second) | ||
implicit val eqAsyncStreamIntIntInt: Eq[AsyncStream[(Int,Int,Int)]] = asyncStreamEq[(Int,Int,Int)](1.second) | ||
|
||
checkAll("AsyncStream[Int]", MonadTests[AsyncStream].monad[Int, Int, Int]) | ||
checkAll("AsyncStream[Int]", SemigroupTests[AsyncStream[Int]](asyncStreamSemigroup[Int]).semigroup) | ||
checkAll("AsyncStream[Int]", MonoidTests[AsyncStream[Int]].monoid) | ||
|
||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the law-checking for
Var
is still usingStackUnsafeMonad
, though?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it still is. The easy fix of using
StackSafeMonad
failed and rather than leave the code seeming to be be stack safe I left it unchanged (and still commented). I didn't have the time at hand to dive intoVar
to understand why it's not behaving. I think I should early this week