Do not share same socket between two threads at the same time
Asked Answered
Y

7

13

I have around 60 sockets and 20 threads and I want to make sure each thread works on different socket everytime so I don't want to share same socket between two threads at all.

In my SocketManager class, I have a background thread which runs every 60 seconds and calls updateLiveSockets() method. In the updateLiveSockets() method, I iterate all the sockets I have and then start pinging them one by one by calling send method of SendToQueue class and basis on the response I mark them as live or dead. In the updateLiveSockets() method, I always need to iterate all the sockets and ping them to check whether they are live or dead.

Now all the reader threads will call getNextSocket() method of SocketManager class concurrently to get the next live available socket to send the business message on that socket. So I have two types of messages which I am sending on a socket:

  • One is ping message on a socket. This is only sent from timer thread calling updateLiveSockets() method in SocketManager class.
  • Other is business message on a socket. This is done in SendToQueue class.

So if pinger thread is pinging a socket to check whether they are live or not then no other business thread should use that socket. Similarly if business thread is using a socket to send data on it, then pinger thread should not ping that socket. And this applies to all the socket. But I need to make sure that in updateLiveSockets method, we are pinging all the available sockets whenever my background thread starts so that we can figure out which socket is live or dead.

Below is my SocketManager class:

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
      new ConcurrentHashMap<>();
  private final ZContext ctx = new ZContext();

  // ...

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets();
      }
    }, 60, 60, TimeUnit.SECONDS);
  }

  // during startup, making a connection and populate once
  private void connectToZMQSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
      liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
    }
  }

  private List<SocketHolder> connect(List<String> paddes, int socketType) {
    List<SocketHolder> socketList = new ArrayList<>();
    // ....
    return socketList;
  }

  // this method will be called by multiple threads concurrently to get the next live socket
  // is there any concurrency or thread safety issue or race condition here?
  public Optional<SocketHolder> getNextSocket() {
    for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
      Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
      if (liveSocket.isPresent()) {
        return liveSocket;
      }
    }
    return Optional.absent();
  }

  private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
    if (!listOfEndPoints.isEmpty()) {
      // The list of live sockets
      List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
      for (SocketHolder obj : listOfEndPoints) {
        if (obj.isLive()) {
          liveOnly.add(obj);
        }
      }
      if (!liveOnly.isEmpty()) {
        // The list is not empty so we shuffle it an return the first element
        return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
      }
    }
    return Optional.absent();
  }

  // runs every 60 seconds to ping all the available socket to make sure whether they are alive or not
  private void updateLiveSockets() {
    Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

    for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
      List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
      List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
      for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
        String endpoint = liveSocket.getEndpoint();
        Map<byte[], byte[]> holder = populateMap();
        Message message = new Message(holder, Partition.COMMAND);

        // pinging to see whether a socket is live or not
        boolean isLive = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
        SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
        liveUpdatedSockets.add(zmq);
      }
      liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
    }
  }
}

And here is my SendToQueue class:

  // this method will be called by multiple reader threads (around 20) concurrently to send the data
  public boolean sendAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    if (!liveSocket.isPresent()) {
      // log error
      return false;
    }       
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // send data on a socket LINE A
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

Problem Statement

Now as you can see above that I am sharing same socket between two threads. It seems getNextSocket() in SocketManager class could return a 0MQ socket to Thread A. Concurrently, the timer thread may access the same 0MQ socket to ping it. In this case Thread A and the timer thread are mutating the same 0MQ socket, which can lead to problems. So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

One solution I can think of is using synchronization on a socket while sending the data but if many threads uses the same socket, resources aren't well utilized. Moreover If msg.send(socket); is blocked (technically it shouldn't) all threads waiting for this socket are blocked. So I guess there might be a better way to ensure that every thread uses a different single live socket at the same time instead of synchronization on a particular socket.

Ylem answered 13/12, 2017 at 0:24 Comment(11)
Even for ordinary map operations, it’s an anti-pattern to iterate over the entrySet() and using map.get(entry.getKey()) instead of entry.getValue() and, even worse, use map.put(entry.getKey(), newValue) instead of entry.setValue(newValue) (in case of non-concurrent map, it could even break the iterator). But in case of concurrent updates, you surely want to use computeIfPresent to prevent interference from other updates.Mcbride
Btw., boolean isLive = (status) ? true : false; is an obsolete statement. Its only effect is boolean isLive = status;, but why do you need two boolean variables for the same thing?Mcbride
@Mcbride yeah I have already fixed your second statement. That was bad on my part. On your first suggestion, I didn't quite follow what you said.Ylem
When you iterate over an entrySet(), you get a Map.Entry instance in each iteration, which allows you to read key and value and set the value without performing any lookups, so you should not perform lookups via get and put, as that’s a waste of resources. In case of of a non-concurrent map like HashMap, using put while iterating over it can break the operation.Mcbride
yeah make sense but where do you see me doing that? Just wondering.Ylem
The last point does not apply to ConcurrentHashMap, where things are slightly different. You still shouldn’t use get. But use either replace instead of put, to detect if you would overwrite a concurrently made update, or use computeIfPresent to avoid concurrent updates in the first place.Mcbride
In your first code, for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) { List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); … … … liveSocketsByDatacenter.put(entry.getKey(), …);Mcbride
Actually I am iterating a different map and doing a lookup on a different map but with the key from first map. Is that still a problem?Ylem
Well, it depends. The big question is, why are you iterating over a different map (by the way, if you only use the keys of a map, there’s keySet()). Your update loop is supposed to update those keys that actually exist in the liveSocketsByDatacenter, and I guess, it should not skip any. So why not iterate over one map to update it?Mcbride
Let us continue this discussion in chat.Ylem
@Ylem I keep thinking of clustering with your question, but WCF should handle this for you. This link may assist but I find it hard to understand why you want to not share the socket when out of the box supports reuse(always, never, whenduplex), learn.microsoft.com/en-us/dotnet/framework/wcf/feature-details/….Alceste
W
3

So I am trying to find a way so that I can prevent different threads from sending data to the same socket at the same time and mucking up my data.

There are certainly a number of different ways to do this. For me this seems like a BlockingQueue is the right thing to use. The business threads would take a socket from the queue and would be guaranteed that no one else would be using that socket.

private final BlockingQueue<SocketHolder> socketHolderQueue = new LinkedBlockingQueue<>();
...
public Optional<SocketHolder> getNextSocket() {
   SocketHolder holder = socketHolderQueue.poll();
   return holder;
}
...
public void finishedWithSocket(SocketHolder holder) {
   socketHolderQueue.put(holder);
}

I think that synchronizing on the socket is not a good idea for the reasons that you mention – the ping thread will be blocking the business thread.

There are a number of ways of handling the ping thread logic. I would store your Socket with a last use time and then your ping thread could every so often take each of the sockets from the same BlockingQueue, test it, and put each back onto the end of the queue after testing.

public void testSockets() {
   // one run this for as many sockets as are in the queue
   int numTests = socketHolderQueue.size();
   for (int i = 0; i < numTests; i++) {
      SocketHolder holder = socketHolderQueue.poll();
      if (holder == null) {
         break;
      }
      if (socketIsOk(socketHolder)) {
          socketHolderQueue.put(socketHolder);
      } else {
          // close it here or something
      }
   }
}

You could also have the getNextSocket() code that dequeues the threads from the queue check the timer and put them on a test queue for the ping thread to use and then take the next one from the queue. The business threads would never be using the same socket at the same time as the ping thread.

Depending on when you want to test the sockets, you can also reset the timer after the business thread returns it to the queue so the ping thread would test the socket after X seconds of no use.

Wille answered 18/12, 2017 at 0:44 Comment(13)
yeah BlockingQueue might be a good choice here which means business threads will take a socket from the queue and somehow they need to put it back into the queue as well once they are done using it so that other threads can take them. Right? And for the ping thread logic, I was not able to understand how will this work out? If possible can you provide an example basis on my code to illustrate how both these things will work out? It will help me understand better.Ylem
Thanks for your update. I got an idea what needs to be done. I have few questions though. In my case, I have a Map of Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); so after your suggestion to use BlockingQueue, my map will become Map<Datacenters, LinkedBlockingQueue<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); I guess. And basis on this I have modified the code and updated in the question.Ylem
I have modified updateLiveSockets, getLiveSocket method but I am not sure how does my finishedWithSocket method will look like since I need to put that socket in a map of its corresponding dc? Also I need to make sure there is no race condition or thread safety issue in my SocketManager class. Can you check out my code in updated section and see I got everything right or there can be any improvements made?Ylem
Hrm. I can see a couple of ways to do it @john. The finishedWithSocket could take a DC argument or the SocketHolder could be updated with the DC.Wille
I see. Apart from that does everything looks good in my updated section of SocketManager code? Also looks like with this solution, in my updated updateLiveSockets method, I won't be iterating all the sockets because all the reader threads will be removing live sockets from the queue so there is some race condition because of which I won't be iterating all the sockets. Right?Ylem
You won't be iterating through all of the sockets, right. If you need to ping all of them then when they are returned to the queue and the timer has expired I would ping them immediately then @john. The whole point is not to ping them when they are being used by the business threads.Wille
Yes I do need to iterate all of the sockets and ping them. We are checking liveness of a socket by pinging all of them so that we dont send on the same socket if we know it is down. It might be possible that box in which those sockets are there, it is down from a long time so if we check the liveness then it will tell us ok that these sockets are down already so don't send any data on it. How will this timer expired logic will work here? I think that part is missing because of which I am confuse. Can you provide an example on that as well basis on my example? It will help clear my doubt.Ylem
I had assumed that the business threads are using the sockets @Ylem when they are out of the queue so there is no way for the pinger to test them and if they are using them I see no reason for them to test the sockets because they will be sending/receiving messages through them. Maybe your problem is that you need to take sockets and release them back to the queue more frequently.Wille
Yeah that's true but my point is - if pinger thread is pinging a socket to check whether they are live or not then no other business thread should use that socket. Similarly if business thread is using a socket to send data on it, then pinger thread should not ping that socket. And this applies to all the socket. But I need to make sure that in updateLiveSockets method, we are pinging all the available sockets whenever my background thread starts so that we can figure out which socket is live or dead.Ylem
Right now problem is, If I am synchronizing on a socket which will block all the business thread to send data on it if multiple business threads picks same socket and I am trying to avoid that but at the same time, need to make sure that my pinger thread is pinging all the available sockets to check whether they are live or dead.Ylem
I would suggest that either the socket is being used for business or pinging – not both. As my answer says, locking on the socket is going to not work. I would suggest that the business sections only pull the sockets off the use queue for short periods of time. So the only time the sockets will need to be pinged is when they are dormant in the queue. @YlemWille
yes that makes sense but how do I guarantee that I am pinging all the available sockets every 60 seconds to make sure they are live or dead? With the approach we have, I am not sure it guarantees that.Ylem
Let us continue this discussion in chat.Wille
T
3

It looks like you should consider using the try-with-resource feature here. You have the SocketHolder or Option class implement the AutoCloseable interface. For instance, let us assume that Option implements this interface. The Option close method will then add back the instance to the container. I created a simple example that shows what I mean. It is not complete but it gives you an idea on how to implement this in your code.

    public class ObjectManager implements AutoCloseable {

    private static class ObjectManagerFactory {
        private static ObjectManager objMgr = new ObjectManager();
    }

    private ObjectManager() {}

    public static ObjectManager getInstance() { return ObjectManagerFactory.objMgr; }

    private static final int SIZE = 10;

    private static BlockingQueue<AutoCloseable> objects = new LinkedBlockingQueue<AutoCloseable>();

    private static ScheduledExecutorService sch;
    static {
        for(int cnt = 0 ; cnt < SIZE ; cnt++) {
            objects.add(new AutoCloseable() {

                @Override
                public void close() throws Exception {
                    System.out.println(Thread.currentThread() + " - Adding object back to pool:" + this + " size: " + objects.size());
                    objects.put(this);
                    System.out.println(Thread.currentThread() + " - Added object back to pool:" + this);
                }

            });
        }
        sch = Executors.newSingleThreadScheduledExecutor();
        sch.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                updateObjects();
            }

        }, 10, 10, TimeUnit.MICROSECONDS);
    }

    static void updateObjects() {
        for(int cnt = 0 ; ! objects.isEmpty() && cnt < SIZE ; cnt++ ) {
            try(AutoCloseable object = objects.take()) {
                System.out.println(Thread.currentThread() + " - updateObjects - updated object: " + object + " size: " + objects.size());
            } catch (Throwable t) {
                // TODO Auto-generated catch block
                t.printStackTrace();
            }
        }
    }

    public AutoCloseable getNext() {
        try {
            return objects.take();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        try (ObjectManager mgr = ObjectManager.getInstance()) {

            for (int cnt = 0; cnt < 5; cnt++) {
                try (AutoCloseable o = mgr.getNext()) {
                    System.out.println(Thread.currentThread() + " - Working with " + o);
                    Thread.sleep(1000);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        } catch (Throwable tt) {
            tt.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        // TODO Auto-generated method stub
        ObjectManager.sch.shutdownNow();
    }
}
Tagmeme answered 21/12, 2017 at 14:53 Comment(2)
Thanks for your suggestion. I am still not sure on how this will work on my code. If possible can you provide an example basis on my code on how this will work?Ylem
hey thanks for the help. Let's discuss this here as it will be easier to discuss on implementation.Ylem
A
2

I will make some points here. In the getNextSocket method getOrderedDatacenters method will always return the same ordered list, so you will always pick from the same datacenters from start to end (it's not a problem).

How do you guarantee that two threads wont get the same liveSocket from getNextSocket?

What you are saying here it is true:

Concurrently, the timer thread may access the same 0MQ socket to ping it.

I think the main problem here is that you don't distinguish between free sockets and reserved sockets.

One option as you said is to synchronize up on each socket. An other option is to keep a list of reserved sockets and when you want to get a next socket or to update sockets, to pick only from the free sockets. You don't want to update a socket which is already reserved.

Also you can take a look at here if it fits your needs.

Apian answered 15/12, 2017 at 22:49 Comment(1)
I see. I am little bit confuse on how will this approach will work here. Can you provide an example basis on my code on how will this work out? It will help me understand better.Ylem
T
1

There's a concept in operating systems software engineering called the critical section. A critical section occurs when 2 or more processes have shared data and they are concurrently executed, in this case, no process should modify or even read this shared data if there's another process accessing these data. So as a process enters the critical section it should notify all other concurrently executed processes that it's currently modifying the critical section, so all other processes should be blocked-waiting-to enter this critical section. you would ask who organize what process enters, this is another problem called process scheduling that controls what process should enter this critical section and the operating system do that for you.

so the best solution to you is using a semaphore where the value of the semaphore is the number of sockets, in your case, I think you have one socket so you will use a semaphore-Binary Semaphore- initialized with a semaphore value = 1, then your code should be divided into four main sections: critical section entry, the critical section, critical section exiting and remainder section.

  • Critical section entry: where a process enters the critical section and block all other processes. The semaphore will allow one Process-Thread-to enter the critical section-use a socket- and the value of the semaphore will be decremented-equal to zero-.
  • The critical section: the critical section code that the process should do.
  • Critical section exiting: the process releasing the critical section for another process to enter. The semaphore value will be incremented-equal to 1-allowing another process to enter
  • Remainder section: the rest of all your code excluding the previous 3 sections.

Now all you need is to open any Java tutorials about semaphores to know how to apply a semaphore in Java, it's really easy.

Tuatara answered 13/12, 2017 at 2:39 Comment(2)
I don't have one socket, I have around 60 sockets and 20 threads.Ylem
the same concept bro, just initialize the semaphore value with whatever value of sockets you use and all threads will be distributed among these sockets. there will be no threads blocked as long as the value of the semaphore > 0 as well as there won't be 2 threads using the same socket.Tuatara
P
1

Mouhammed Elshaaer is right, but in additional you can also use any concurrent collection, for example ConcurrentHashMap where you can track that each thread works on different socket (for example ConcurrentHashMap key: socket hash code, value: thread hash code or smth else). As for me it's a little bit stupid solution, but it can be used to.

Pietro answered 15/12, 2017 at 8:20 Comment(1)
This should be a commentArmin
A
0

For the problem of threads (Thread A and timer thread) accessing the same socket, I would keep 2 socket list for each datacenter:

  • list A: The sockets that are not in use
  • list B: The sockets that are in use

i.e.,

  • call synchronisation getNextSocket() to find an not-in-use socket from list A, remove it from list A and add it to list B;
  • call synchronisation returnSocket(Socket) upon receiving the reponse/ACK for a sent message (either business or ping), to move the socket from list B back to list A. Put a try {} finally {} block around "sending message" to make sure that the socket will be put back to list A even if there is an exception.
Arillode answered 22/12, 2017 at 0:10 Comment(0)
G
-2

I have a simple solution maybe help you. I don't know if in Java you can add a custom attribute to each socket. In Socket.io you can. So I wanna considerate this (I will delete this answer if not).

You will add a boolean attribute called locked to each socket. So, when your thread check the first socket, locked attribute will be True. Any other thread, when ping THIS socket, will check if locked attribute is False. If not, getNextSocket.

So, in this stretch below...

...
for (SocketHolder liveSocket : liveSockets) {
        Socket socket = liveSocket.getSocket();
...

You will check if socket is locked or not. If yes, kill this thread or interrupt this or go to next socket. (I don't know how you call it in Java).

So the process is...

  • Thread get an unlocked socket
  • Thread set this socket.locked to True.
  • Thread ping socket and do any stuff you want
  • Thread set this socket.locked to False.
  • Thread go to next.

Sorry my bad english :)

Germanism answered 21/12, 2017 at 20:22 Comment(1)
A socket that's in use shouldn't be in the pool at all.Incurvate

© 2022 - 2024 — McMap. All rights reserved.