Pushing elements externally to a reactive stream in fs2
Asked Answered
S

3

8

I have an external (that is, I cannot change it) Java API which looks like this:

public interface Sender {
    void send(Event e);
}

I need to implement a Sender which accepts each event, transforms it to a JSON object, collects some number of them into a single bundle and sends over HTTP to some endpoint. This all should be done asynchronously, without send() blocking the calling thread, with some fixed-size buffer and dropping new events if the buffer is full.

With akka-streams this is quite simple: I create a graph of stages (which uses akka-http to send HTTP requests), materialize it and use the materialized ActorRef to push new events to the stream:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def send(e: Event): Unit = {
  eventsActor ! e
}

Here CustomBuffer is a custom GraphStage which is very similar to the library-provided Buffer but tailored to our specific needs; it probably does not matter for this particular question.

As you can see, interacting with the stream from non-stream code is very simple - the ! method on the ActorRef trait is asynchronous and does not need any additional machinery to be called. Each event which is sent to the actor is then processed through the entire reactive pipeline. Moreover, because of how akka-http is implemented, I even get connection pooling for free, so no more than one connection is opened to the server.

However, I cannot find a way to do the same thing with FS2 properly. Even discarding the question of buffering (I will probably need to write a custom Pipe implementation which does additional things that we need) and HTTP connection pooling, I'm still stuck with a more basic thing - that is, how to push the data to the reactive stream "from outside".

All tutorials and documentation that I can find assume that the entire program happens inside some effect context, usually IO. This is not my case - the send() method is invoked by the Java library at unspecified times. Therefore, I just cannot keep everything inside one IO action, I necessarily have to finalize the "push" action inside the send() method, and have the reactive stream as a separate entity, because I want to aggregate events and hopefully pool HTTP connections (which I believe is naturally tied to the reactive stream).

I assume that I need some additional data structure, like Queue. fs2 does indeed have some kind of fs2.concurrent.Queue, but again, all documentation shows how to use it inside a single IO context, so I assume that doing something like

val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()

and then using queue inside the stream definition and then separately inside the send() method with further unsafeRun calls:

val eventPipeline = queue.dequeue
  .through(customBuffer(bufferSize))
  .groupWithin(batchSize, flushDuration)
  .map(toBundle)
  .mapAsyncUnordered(1)(sendRequest)
  .evalTap(response => ...)
  .compile
  .drain

eventPipeline.unsafeRunAsync(...)  // or something

override def send(e: Event) {
  queue.enqueue(e).unsafeRunSync()
}

is not the correct way and most likely would not even work.

So, my question is, how do I properly use fs2 to solve my problem?

Slavism answered 30/11, 2018 at 8:57 Comment(0)
M
2

Consider the following example:

import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Answer {
  type Event = String

  trait Sender {
    def send(event: Event): Unit
  }

  def main(args: Array[String]): Unit = {
    val sender: Sender = {
      val ec = ExecutionContext.global
      implicit val cs: ContextShift[IO] = IO.contextShift(ec)
      implicit val timer: Timer[IO] = IO.timer(ec)

      fs2Sender[IO](2)
    }

    val events = List("a", "b", "c", "d")
    events.foreach { evt => new Thread(() => sender.send(evt)).start() }
    Thread sleep 3000
  }

  def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
    // dummy impl
    // this is where the actual logic for batching
    //   and shipping over the network would live
    val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
      for {
        _ <- F.delay { println(s"consuming [$event]...") }
        _ <- Timer[F].sleep(1.seconds)
        _ <- F.delay { println(s"...[$event] consumed") }
      } yield ()
    }

    val suspended = for {
      q <- Queue.bounded[F, Event](maxBufferedSize)
      _ <- q.dequeue.through(consume).compile.drain.start
      sender <- F.delay[Sender] { evt =>
        val enqueue = for {
          wasEnqueued <- q.offer1(evt)
          _ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
        } yield ()
        enqueue.toIO.unsafeRunAsyncAndForget()
      }
    } yield sender

    suspended.toIO.unsafeRunSync()
  }
}

The main idea is to use a concurrent Queue from fs2. Note, that the above code demonstrates that neither the Sender interface nor the logic in main can be changed. Only an implementation of the Sender interface can be swapped out.

Maddy answered 2/2, 2019 at 23:16 Comment(1)
This is pretty much what I ended up doing, and what I originally described in my question as something which I think won't work. It actually did work, so I believe this is indeed the proper way to do this.Slavism
D
1

I don't have much experience with exactly that library but it should look somehow like that:

import cats.effect.{ExitCode, IO, IOApp}
import fs2.concurrent.Queue

case class Event(id: Int)

class JavaProducer{

  new Thread(new Runnable {
    override def run(): Unit = {
      var id = 0
      while(true){
        Thread.sleep(1000)
        id += 1
        send(Event(id))
      }
    }
  }).start()

  def send(event: Event): Unit ={
    println(s"Original producer prints $event")
  }
}

class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
  override def send(event: Event): Unit = {
    println(s"Hacked producer pushes $event")
    queue.enqueue1(event).unsafeRunSync()
    println(s"Hacked producer pushes $event - Pushed")
  }

}

object Test extends IOApp{
  override def run(args: List[String]): IO[ExitCode] = {
    val x: IO[Unit] = for {
      queue <- Queue.unbounded[IO, Event]
      _ = new HackedProducer(queue)
      done <- queue.dequeue.map(ev => {
        println(s"Got $ev")
      }).compile.drain
    } yield done
    x.map(_ => ExitCode.Success)
  }

}
Devilry answered 30/11, 2018 at 11:34 Comment(8)
This is exactly the issue I'm talking about - I don't have any producer to initialize inside a separate thread. The Java library I'm interfacing with is this producer, and I don't have any control over how it is instantiated.Slavism
Now I got your problem. Then your initial solution could possibly work if you don't call unsafeRunAsync after the initialization of the queue. Only when you push the element and in the end.Devilry
If I don't run unsafeRunAsync on eventPipeline (actually, in my current implementation I'll use unsafeRunCancellable because I need a way to shut this pipeline down due to the API requirements) then how the stream is going to start? Or if you meant not running Queue.unbounded[..].unsafeRunSync(), then how do I get the instance of the queue? Anyway, I'm currently in the process of trying it out.Slavism
Yes, I mean not running Queue.unbounded[..].unsafeRunSync(). Example is in my answer. Once you started for you are in the IO monad.Devilry
Then my situation is still not solved :) How do I get the instance of the queue inside the send() method in this case? If I would run queue.unsafeRunSync() inside send(), then I'll get a separate instance of Queue during each send() invocation, which wouldn't be connected in any way with the event pipeline.Slavism
queue.enqueue(e).unsafeRunSync() doesn't create a new queue. It just actually puts an element in this queue.Devilry
Yes, I know; but how do I get the queue instance in the first place? Either by running queue.unsafeRunSync() or by otherwise tying it into an IO action; regardless of how I do it, it still would be a new queue instance for each send() call.Slavism
In my example queue <- Queue.unbounded[IO, Event] the type of queue will be just Queue, not IO[Queue]. So you can pass it wherever you like. If it is still not clear I'll try to write a complete example but later.Devilry
S
0

We can create a bounded queue that will consume elements from sender and make them available to fs2 stream processing.


import cats.effect.IO
import cats.effect.std.Queue

import fs2.Stream

trait Sender[T]:
    def send(e: T): Unit

object Sender:
     def apply[T](bufferSize: Int): IO[(Sender[T], Stream[IO, T])] =
         for
             q <- Queue.bounded[IO, T](bufferSize)
         yield
             val sender: Sender[T] = (e: T) => q.offer(e).unsafeRunSync()
             def stm: Stream[IO, T] = Stream.eval(q.take) ++ stm
             (sender, stm)

Then we'll have two ends - one for Java worlds, to send new elements to Sender. Another one - for stream processing in fs2.

class TestSenderQueue:

    @Test def testSenderQueue: Unit =
        val (sender, stream) = Sender[Int](1)
          .unsafeRunSync()// we have to run it preliminary to make `sender` available to external system
        
        val processing = 
            stream
                .map(i => i * i)
                .evalMap{ ii => IO{ println(ii)}}
        sender.send(1)
                
        processing.compile.toList.start//NB! we start processing in a separate fiber
            .unsafeRunSync() // immediately right now.
        sender.send(2)
        Thread.sleep(100)
        (0 until 100).foreach(sender.send)
        println("finished")

Note that we push data in the current thread and have to run fs2 in a separate thread (.start).

Sample answered 8/1, 2021 at 19:6 Comment(1)
That's pretty much what I ended up doing, and what is described in the other answer, and what I originally described in my answer as something which I thought won't work :) but apparently it seems that is the correct way to do it, and it does work. Thanks anyway)Slavism

© 2022 - 2024 — McMap. All rights reserved.