How to use Flow.map in akka client side websocket in akka streams
Asked Answered
C

0

1

I have a WebSocket server I need to send and receive messages to it over the established WebSocket connection as Akka client-side WebSocket does not provide us the conventional websocket.send() feature as in java

so I found this solution How to use Akka-HTTP client websocket send message

which works great but I am a beginner in Akka stream I am trying to achieve the following scenario

example in my case WebSocket server is running at ws://0.0.0.0:8188

first, I will send a message to the server for initiating the sessionID

request# 1

{
        "janus" : "create",
        "transaction" : "<random alphanumeric string>"
}

the server will respond with the session id

response #1 
{
   "janus": "success",
   "session_id": 2630959283560140,
   "transaction": "asqeasd4as3d4asdasddas",
   "data": {
        "id": 4574061985075210
   }
}

then based on id 4574061985075210 I will send another message and receive further info

    request # 02 {
 "janus": "attach","session_id":${sessionId},"plugin":"janus.plugin.echotest","transaction":"asqeasd4as3d4asdasddas"     
}

response # 02 {
}
----

so far I can display the response of request #1 but I don't know how to initiate request #2 with sessionID I got from request #1 and display its response

here is my code

    def main(args: Array[String]): Unit = {
        val url = "ws://0.0.0.0:8188"
        val req = WebSocketRequest(url, Nil, Option("janus-protocol"))
    
      

    implicit val materializer = ActorMaterializer()

    import system.dispatcher

    val webSocketFlow = Http().webSocketClientFlow(req)

    val messageSource: Source[Message, ActorRef] =
      Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)

    val messageSink: Sink[Message, NotUsed] =
      Flow[Message]
        .map{message =>
          println(s"Received text message: [$message]")
          val strJson = message.toString
          val jsonResponse = strJson.parseJson
          val jsonObj = jsonResponse.asJsObject
          val janus = jsonObj.fields("janus").convertTo[String]
          val data = jsonObj.fields("data").asJsObject
          val sessionID = data.fields("id")
          // i need to take this SessionId and send to the websocket established connection and receive its response
        }
        .to(Sink.ignore)

    val ((ws, upgradeResponse), closed) =
      messageSource
        .viaMat(webSocketFlow)(Keep.both)
        .toMat(messageSink)(Keep.both)
        .run()

    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    val source =
      """{ "janus": "create", "transaction":"d1403sa54a5s3d4as3das"}"""
    val jsonAst = source.parseJson


    ws ! TextMessage.Strict(jsonAst.toString())
   
  }

any help would be appreciated Thanks in advance

Cassondra answered 20/8, 2020 at 13:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.