diff options
| author | Amir Saeid <amir@glgdgt.com> | 2026-02-14 16:18:58 +0000 |
|---|---|---|
| committer | Amir Saeid <amir@glgdgt.com> | 2026-02-14 16:18:58 +0000 |
| commit | 283d4eca6ac24391a5dc86cc3d55175ee5f78741 (patch) | |
| tree | be7f8f691da514917efbe0e948e09bd76c2cf673 /fs2/src/main/scala/com | |
| parent | 1fa98aed370d81b656d8e32c44f7bafa40be12b1 (diff) | |
Allow converting Fair/FairT to fs2.Stream
Diffstat (limited to 'fs2/src/main/scala/com')
| -rw-r--r-- | fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala | 34 | ||||
| -rw-r--r-- | fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala | 18 |
2 files changed, 52 insertions, 0 deletions
diff --git a/fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala b/fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala new file mode 100644 index 0000000..4083c31 --- /dev/null +++ b/fs2/src/main/scala/com/codiff/fairstream/fs2/conversions.scala @@ -0,0 +1,34 @@ +package com.codiff.fairstream +package fs2 + +import cats.Monad +import _root_.fs2.{Pull, Pure, Stream} + +object conversions { + + def fairToStream[A](fair: Fair[A]): Stream[Pure, A] = { + def go(f: Fair[A]): Pull[Pure, A, Unit] = f match { + case Fair.Nil => Pull.done + case Fair.One(a) => Pull.output1(a) + case c: Fair.Choice[A @unchecked] => Pull.output1(c.a) >> go(c.rest) + case i: Fair.Incomplete[A @unchecked] => go(i.step) + } + go(fair).stream + } + + def fairTToStream[F[_], A](fairT: FairT[F, A])(implicit + F: Monad[F] + ): Stream[F, A] = { + def go(ft: FairT[F, A]): Pull[F, A, Unit] = + Pull.eval(ft.run).flatMap { + case FairE.Nil() => Pull.done + case FairE.One(a) => Pull.output1(a) + case c: FairE.Choice[F, A] @unchecked => + Pull.output1(c.a) >> go(c.rest) + case inc: FairE.Incomplete[F, A] @unchecked => + go(inc.rest) + } + go(fairT).stream + } + +} diff --git a/fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala b/fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala new file mode 100644 index 0000000..1c7aba7 --- /dev/null +++ b/fs2/src/main/scala/com/codiff/fairstream/fs2/syntax.scala @@ -0,0 +1,18 @@ +package com.codiff.fairstream +package fs2 + +import cats.Monad +import _root_.fs2.{Pure, Stream} + +object syntax { + + implicit class FairFs2Ops[A](val fair: Fair[A]) extends AnyVal { + def toFs2: Stream[Pure, A] = conversions.fairToStream(fair) + } + + implicit class FairTFs2Ops[F[_], A](val fairT: FairT[F, A]) extends AnyVal { + def toFs2(implicit F: Monad[F]): Stream[F, A] = + conversions.fairTToStream(fairT) + } + +} |
