Dealer (Server) to Dealer (Worker) Not Working
Asked Answered
S

1

6

In the following DEALER to DEALER connection, the Worker DEALER sends [][Foo!], i.e. a 2-frame message, to the Server DEALER.

package net.async

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Socket

class Worker(name: String) extends Runnable {

  import DealerToDealer._

  val WorkerResponse = "Foo!".getBytes

  val id = name.getBytes

  override def run(): Unit = {
    val context = ZMQ.context(1)
    val worker = context.socket(ZMQ.DEALER)
    worker.setIdentity(id)
    worker.connect(s"tcp://localhost:$Port")
    runHelper(worker)
  }

  private def runHelper(worker: Socket): Unit = {
    println(s"Worker ${name}: sending Empty + ${show(WorkerResponse)}.")
    worker.send(Empty, ZMQ.SNDMORE)
    worker.send(WorkerResponse, 0)
    println(s"Worker ${name}: sent")
    val message = show(worker.recv(0))
    println(s"Worker ${show(worker.getIdentity)}: received message: $message.")
    runHelper(worker)
  }
}

object DealerToDealer {

  def show(xs: Array[Byte]): String =
    new String(xs)

  val Port = 5555

  val Empty = "".getBytes

  val WorkerMessage = "Bar!".getBytes
  val WorkLimit = 25

  def main(xs: Array[String]): Unit = {
    val context = ZMQ.context(1)
    val server = context.socket(ZMQ.DEALER)
    new Thread(new Worker("Bob")).run()
    server.bind(s"tcp://*:$Port")
    runHelper(server, 0)
  }

  private def runHelper(server: Socket, sent: Int): Unit = {
    if (sent > WorkLimit ) {
      println(s"SERVER: sent the '$WorkerMessage' message ${WorkLimit} times. shutting down.")
      sys.exit(0)
    }
    else {
      println(s"SERVER: sending Empty + ${show(WorkerMessage)}.")
      server.send(Empty, ZMQ.SNDMORE)
      server.send(WorkerMessage, 0)
      println(s"SERVER: sent")
      val message = server.recv(0)
      println(s"SERVER: received message: ${show(message)}.")
      runHelper(server, sent + 1)
    }
  }
}

However, when running it, only the print statements show the sending of the Worker's message. The actual receipt of the Server is not shown

[info] Running net.async.DealerToDealer
[info] Worker Bob: sending Empty + Foo!.
[info] Worker Bob: sent

Why doesn't the Server receive this message?

Also, is the [][Message] format correct for DEALER to DEALER communication?

Shul answered 15/8, 2016 at 1:40 Comment(0)
G
1

Yes the format of [][message] is correct, but remember you need to read (everything up to the) empty frame and then your message.

Girardi answered 17/8, 2016 at 19:53 Comment(2)
I start my worker on a new thread: new Thread(new Worker("Bob")).run(). How can I read everything up to the empty frame and my message? Simply do two dealerServer.recv(0)'s to receieve [], and then [message]?Shul
Oops, so you did. just receive frames in a while( ! empty ) loop, then read the message content after the while loop. Although as you get no debug out from the 'server' this may not be the problemGirardi

© 2022 - 2024 — McMap. All rights reserved.