I have around 60 sockets and 20 threads and I want to make sure each thread works on different socket everytime so I don't want to share same socket between two threads at all.
In my SocketManager
class, I have a background thread which runs every 60 seconds and calls updateLiveSockets()
method. In the updateLiveSockets()
method, I iterate all the sockets I have and then start pinging them one by one by calling send
method of SendToQueue
class and basis on the response I mark them as live or dead. In the updateLiveSockets()
method, I always need to iterate all the sockets and ping them to check whether they are live or dead.
Now all the reader threads will call getNextSocket()
method of SocketManager
class concurrently to get the next live available socket to send the business message on that socket. So I have two types of messages which I am sending on a socket:
- One is
ping
message on a socket. This is only sent from timer thread callingupdateLiveSockets()
method inSocketManager
class. - Other is
business
message on a socket. This is done inSendToQueue
class.
So if pinger thread is pinging a socket to check whether they are live or not then no other business thread should use that socket. Similarly if business thread is using a socket to send data on it, then pinger thread should not ping that socket. And this applies to all the socket. But I need to make sure that in updateLiveSockets
method, we are pinging all the available sockets whenever my background thread starts so that we can figure out which socket is live or dead.
Below is my SocketManager
class:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!listOfEndPoints.isEmpty()) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
And here is my SendToQueue
class:
// this method will be called by multiple reader threads (around 20) concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
PendingMessage m = new PendingMessage(address, encodedRecords, true);
cache.put(address, m);
return doSendAsync(m);
}
private boolean doSendAsync(final PendingMessage pendingMessage) {
Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
if (!liveSocket.isPresent()) {
// log error
return false;
}
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(liveSocket.get().getSocket());
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
cache.invalidate(address);
}
}
Problem Statement
Now as you can see above that I am sharing same socket between two threads. It seems getNextSocket()
in SocketManager
class could return a 0MQ socket
to Thread A
. Concurrently, the timer thread
may access the same 0MQ socket
to ping it. In this case Thread A
and the timer thread
are mutating the same 0MQ socket
, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.
One solution I can think of is using synchronization
on a socket while sending the data but if many threads uses the same socket, resources aren't well utilized. Moreover If msg.send(socket);
is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket.
entrySet()
and usingmap.get(entry.getKey())
instead ofentry.getValue()
and, even worse, usemap.put(entry.getKey(), newValue)
instead ofentry.setValue(newValue)
(in case of non-concurrent map, it could even break the iterator). But in case of concurrent updates, you surely want to usecomputeIfPresent
to prevent interference from other updates. – Mcbrideboolean isLive = (status) ? true : false;
is an obsolete statement. Its only effect isboolean isLive = status;
, but why do you need twoboolean
variables for the same thing? – McbrideentrySet()
, you get aMap.Entry
instance in each iteration, which allows you to read key and value and set the value without performing any lookups, so you should not perform lookups viaget
andput
, as that’s a waste of resources. In case of of a non-concurrent map likeHashMap
, usingput
while iterating over it can break the operation. – McbrideConcurrentHashMap
, where things are slightly different. You still shouldn’t useget
. But use eitherreplace
instead ofput
, to detect if you would overwrite a concurrently made update, or usecomputeIfPresent
to avoid concurrent updates in the first place. – Mcbridefor (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); … … … liveSocketsByDatacenter.put(entry.getKey(), …);
– McbridekeySet()
). Your update loop is supposed to update those keys that actually exist in theliveSocketsByDatacenter
, and I guess, it should not skip any. So why not iterate over one map to update it? – Mcbride