aboutsummaryrefslogtreecommitdiff
path: root/fs2/src/main
diff options
context:
space:
mode:
authorAmir Saeid <amir@glgdgt.com>2026-02-14 16:18:58 +0000
committerAmir Saeid <amir@glgdgt.com>2026-02-14 16:18:58 +0000
commit283d4eca6ac24391a5dc86cc3d55175ee5f78741 (patch)
treebe7f8f691da514917efbe0e948e09bd76c2cf673 /fs2/src/main
parent1fa98aed370d81b656d8e32c44f7bafa40be12b1 (diff)
Allow converting Fair/FairT to fs2.Stream
Diffstat (limited to 'fs2/src/main')
-rw-r--r--fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala34
-rw-r--r--fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala18
2 files changed, 52 insertions, 0 deletions
diff --git a/fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala b/fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala
new file mode 100644
index 0000000..4083c31
--- /dev/null
+++ b/fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala
@@ -0,0 +1,34 @@
+package com.codiff.fairstream
+package fs2
+
+import cats.Monad
+import _root_.fs2.{Pull, Pure, Stream}
+
+object conversions {
+
+ def fairToStream[A](fair: Fair[A]): Stream[Pure, A] = {
+ def go(f: Fair[A]): Pull[Pure, A, Unit] = f match {
+ case Fair.Nil => Pull.done
+ case Fair.One(a) => Pull.output1(a)
+ case c: Fair.Choice[A @unchecked] => Pull.output1(c.a) >> go(c.rest)
+ case i: Fair.Incomplete[A @unchecked] => go(i.step)
+ }
+ go(fair).stream
+ }
+
+ def fairTToStream[F[_], A](fairT: FairT[F, A])(implicit
+ F: Monad[F]
+ ): Stream[F, A] = {
+ def go(ft: FairT[F, A]): Pull[F, A, Unit] =
+ Pull.eval(ft.run).flatMap {
+ case FairE.Nil() => Pull.done
+ case FairE.One(a) => Pull.output1(a)
+ case c: FairE.Choice[F, A] @unchecked =>
+ Pull.output1(c.a) >> go(c.rest)
+ case inc: FairE.Incomplete[F, A] @unchecked =>
+ go(inc.rest)
+ }
+ go(fairT).stream
+ }
+
+}
diff --git a/fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala b/fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala
new file mode 100644
index 0000000..1c7aba7
--- /dev/null
+++ b/fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala
@@ -0,0 +1,18 @@
+package com.codiff.fairstream
+package fs2
+
+import cats.Monad
+import _root_.fs2.{Pure, Stream}
+
+object syntax {
+
+ implicit class FairFs2Ops[A](val fair: Fair[A]) extends AnyVal {
+ def toFs2: Stream[Pure, A] = conversions.fairToStream(fair)
+ }
+
+ implicit class FairTFs2Ops[F[_], A](val fairT: FairT[F, A]) extends AnyVal {
+ def toFs2(implicit F: Monad[F]): Stream[F, A] =
+ conversions.fairTToStream(fairT)
+ }
+
+}