i think I found a trick that works for me to check if a socket is connected. The best solution to your client side is to create a socket poller and poll on the pull socket until a message is received. This avoids wasteful sleeps, and makes for generally tighter code:
Here is the code that do the works:
private void blockUntilConnected() {
ZMQ.Poller poller = ctx.createPoller(1);
poller.register(this.subscriber, ZMQ.Poller.POLLIN);
int rc = -1;
while (rc == -1) {
rc = poller.poll(1000);
}
poller.pollin(0);
}
I will also supply the full source code:
Server Part:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.net.InetSocketAddress;
import java.net.Socket;
import static io.Adrestus.config.ConsensusConfiguration.*;
public class ConsensusServer {
private static Logger LOG = LoggerFactory.getLogger(ConsensusServer.class);
private final ZContext ctx;
private final String IP;
private final ZMQ.Socket publisher;
private final ZMQ.Socket collector;
public ConsensusServer(String IP) {
this.IP = IP;
this.ctx = new ZContext();
this.publisher = ctx.createSocket(SocketType.PUB);
this.publisher.setHeartbeatIvl(2);
this.collector = ctx.createSocket(SocketType.PULL);
this.publisher.bind("tcp://" + IP + ":" + PUBLISHER_PORT);
this.collector.bind("tcp://" + IP + ":" + COLLECTOR_PORT);
this.collector.setReceiveTimeOut(CONSENSUS_TIMEOUT);
this.publisher.setSendTimeOut(CONSENSUS_TIMEOUT);
}
public ConsensusServer() {
this.IP = findIP();
this.ctx = new ZContext();
this.publisher = ctx.createSocket(SocketType.PUB);
this.collector = ctx.createSocket(SocketType.PULL);
this.publisher.bind("tcp://" + IP + ":" + PUBLISHER_PORT);
this.collector.bind("tcp://" + IP + ":" + COLLECTOR_PORT);
this.publisher.setSendTimeOut(CONSENSUS_TIMEOUT);
this.collector.setReceiveTimeOut(CONSENSUS_TIMEOUT);
}
private String findIP() {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("google.com", 80));
return socket.getLocalAddress().getHostAddress();
} catch (Exception e) {
e.printStackTrace();
}
throw new IllegalArgumentException("Make sure you intern connection is working");
}
public void publishMessage(byte[] data) {
publisher.send(data, 0);
}
public byte[] receiveData() {
byte[] data = null;
try {
data = collector.recv();
} catch (Exception e) {
LOG.info("Socket Closed");
}
return data;
}
public static Logger getLOG() {
return LOG;
}
public static void setLOG(Logger LOG) {
ConsensusServer.LOG = LOG;
}
public ZContext getCtx() {
return ctx;
}
public String getIP() {
return IP;
}
public ZMQ.Socket getPublisher() {
return publisher;
}
public ZMQ.Socket getCollector() {
return collector;
}
public void close() {
this.publisher.close();
this.collector.close();
this.ctx.close();
}
}
Client Part:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import static io.Adrestus.config.ConsensusConfiguration.*;
public class ConsensusClient {
private static Logger LOG = LoggerFactory.getLogger(ConsensusClient.class);
private final String IP;
private ZContext ctx;
private final ZMQ.Socket subscriber;
private final ZMQ.Socket push;
public ConsensusClient(String IP) {
this.ctx = new ZContext();
this.IP = IP;
this.subscriber = ctx.createSocket(SocketType.SUB);
this.push = ctx.createSocket(SocketType.PUSH);
this.subscriber.connect("tcp://" + IP + ":" + SUBSCRIBER_PORT);
this.subscriber.subscribe(ZMQ.SUBSCRIPTION_ALL);
this.subscriber.setReceiveTimeOut(CONSENSUS_TIMEOUT);
blockUntilConnected();
this.push.connect("tcp://" + IP + ":" + COLLECTOR_PORT);
}
private void blockUntilConnected() {
ZMQ.Poller poller = ctx.createPoller(1);
poller.register(this.subscriber, ZMQ.Poller.POLLIN);
int rc = -1;
while (rc == -1) {
rc = poller.poll(1000);
}
poller.pollin(0);
}
public void pushMessage(byte[] data) {
push.send(data);
}
public byte[] receiveData() {
byte[] data = subscriber.recv();
return data;
}
public void close() {
this.subscriber.close();
this.push.close();
this.ctx.close();
}
}
Main part: (Notice that the client is first initialized and it's blocked until a server is started and connected. You can simply add a timeout if you don't want to hang on forever)
import java.nio.charset.StandardCharsets;
public class CustomTest {
public static void main(String[] args) {
//client already started and block until server is connected
(new Thread() {
public void run() {
ConsensusClient Client = new ConsensusClient("localhost");
while (true) {
byte[] res = Client.receiveData();
System.out.println(new String(res));
}
}
}).start();
Thread.sleep(3000);
//server started
ConsensusServer Server = new ConsensusServer("localhost");
Thread.sleep(100);
Server.publishMessage("Message".getBytes(StandardCharsets.UTF_8));
Server.publishMessage("Message".getBytes(StandardCharsets.UTF_8));
Server.publishMessage("Message".getBytes(StandardCharsets.UTF_8));
Thread.sleep(10000);
Server.close();
}
}