Scenario :
We were evaluating ZeroMQ
(specifically jeroMq
) for an event driven mechanism.
The application is distributed where multiple services (both publishers and subscribers are services) can exist either in the same jvm or in distinct nodes, which depends on the deployment architecture.
Observation
For playing around I created a pub
/sub
pattern with inproc:
as the transport , using jero mq (version :0.3.5)
- The thread publishing is able to publish (looks like getting published, at least no errors)
- The subscriber which is in another thread is not receiving anything.
Question
Is using inproc:
along with pub
/sub
feasible?
Tried googling but couldn't find anything specific, any insights?
Code sample for pub
/sub
with inproc:
The working code sample for inproc pub sub using jero mq (version :0.3.5), would be useful for someone later visiting this post. One publisher publishing topics A
and B
, and two subscribers receiving A
and B
separately
/**
* @param args
*/
public static void main(String[] args) {
// The single ZMQ instance
final Context context = ZMQ.context(1);
ExecutorService executorService = Executors.newFixedThreadPool(3);
//Publisher
executorService.execute(new Runnable() {
@Override
public void run() {
startPublishing(context);
}
});
//Subscriber for topic "A"
executorService.execute(new Runnable() {
@Override
public void run() {
startFirstSubscriber(context);
}
});
// Subscriber for topic "B"
executorService.execute(new Runnable() {
@Override
public void run() {
startSecondSubscriber(context);
}
});
}
/**
* Prepare the publisher and publish
*
* @param context
*/
private static void startPublishing(Context context) {
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("inproc://test");
while (!Thread.currentThread().isInterrupted()) {
// Write two messages, each with an envelope and content
try {
publisher.sendMore("A");
publisher.send("We don't want to see this");
LockSupport.parkNanos(1000);
publisher.sendMore("B");
publisher.send("We would like to see this");
} catch (Throwable e) {
e.printStackTrace();
}
}
publisher.close();
context.term();
}
/**
* Prepare and receive through the subscriber
*
* @param context
*/
private static void startFirstSubscriber(Context context) {
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber1 " + address + " : " + contents);
}
subscriber.close();
context.term();
}
/**
* Prepare and receive though the subscriber
*
* @param context
*/
private static void startSecondSubscriber(Context context) {
// Prepare our context and subscriber
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("inproc://test");
subscriber.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
// Read envelope with address
String address = subscriber.recvStr();
// Read message contents
String contents = subscriber.recvStr();
System.out.println("Subscriber2 " + address + " : " + contents);
}
subscriber.close();
context.term();
}