How to ensure that `methodB` is "blocked" if some threads are in `methodA` in Java?
Asked Answered
D

5

7

Class clazz has two methods methodA() and methodB().

How to ensure that methodB is "blocked" if some threads are in methodA in Java (I am using Java 8)?

By "blocking methodB", I mean that "wait until no threads are in methodA()". (Thanks to @AndyTurner)

Note that the requirement above allows the following situations:

  1. Multiple threads are simultaneously in methodA.
  2. Multiple threads are in methodB while no threads are in methodA.
  3. Threads in methodB does not prevent other threads from entering methodA.

My trial: I use StampedLock lock = new StampedLock.

  • In methodA, call long stamp = lock.readLock()
  • Create a new method unlockB and call lock.unlockRead(stamp) in it.
  • In methodB, call long stamp = lock.writeLock() and lock.unlockWrite(stamp).

However, this locking strategy disallows the second and the third situations above.


Edit: I realize that I have not clearly specified the requirements of the synchronization between methodA and methodB. The approach given by @JaroslawPawlak works for the current requirement (I accept it), but not for my original intention (maybe I should first clarify it and then post it in another thread).

Dur answered 28/9, 2016 at 8:52 Comment(10)
Looks nasty because it contains data race between the threads that are still finishing work in methodB while new threads are entering methodA. Are you sure that's what you want and there is no way to distinguish some particular area which needs to be mutually exclusive in methodA and methodB while other parts could be executed concurrently?Mortgagee
condition 2, 3 conflicting each other. what happens when one thread gets into B when no one in A and suddenly some thread goes to A while the first thread still in B. ??Lambda
@Mortgagee That is what I want now. I want to achieve such a fine-grained synchronization because I am implementing a distributed application in which methodA and methodB involve remote calls and may result in a long delay. Thanks.Dur
@SupunWijerathne The situation you describe is allowed. The only thing that is not allowed is that new threads call methodB while some threads are already in methodA.Dur
Do methodA() or methodB() call themselves (or the other one) directly or indirectly?Lurleen
@AndyTurner No.Dur
What action should be taken if methodB() is called whilst other threads are in methodA()? Immediate return, block, exception etc?Lurleen
@AndyTurner This is not allowed. By "no methodB() can be called while ...", I mean the call to methodB() should be blocked.Dur
"Not allowed" implies an exception to me. By "block", do you mean "wait until no threads are in methodA()"?Lurleen
@AndyTurner Yes. I will update the problem. Thanks.Dur
G
4

I think this can do the trick:

private final Lock lock = new ReentrantLock();
private final Semaphore semaphore = new Semaphore(1);
private int threadsInA = 0;

public void methodA() {
    lock.lock();
    threadsInA++;
    semaphore.tryAcquire();
    lock.unlock();

    // your code

    lock.lock();
    threadsInA--;
    if (threadsInA == 0) {
        semaphore.release();
    }
    lock.unlock();
}

public void methodB() throws InterruptedException {
    semaphore.acquire();
    semaphore.release();

    // your code
}

Threads entering methodA increase the count and try to acquire a permit from semaphore (i.e. they take 1 permit if available, but if not available they just continue without a permit). When the last thread leaves methodA, the permit is returned. We cannot use AtomicInteger since changing the count and acquiring/releasing permit from semaphore must be atomic.

Threads entering methodB need to have a permit (and will wait for one if not available), but after they get it they return it immediately allowing others threads to enter methodB.

EDIT:

Another simpler version:

private final int MAX_THREADS = 1_000;
private final Semaphore semaphore = new Semaphore(MAX_THREADS);

public void methodA() throws InterruptedException {
    semaphore.acquire();

    // your code

    semaphore.release();
}

public void methodB() throws InterruptedException {
    semaphore.acquire(MAX_THREADS);
    semaphore.release(MAX_THREADS);

    // your code
}

Every thread in methodA holds a single permit which is released when the thread leaves methodA.

Threads entering methodB wait until all 1000 permits are available (i.e. no threads in methodA), but don't hold them, which allows other threads to enter both methods while methodB is still being executed.

Glynda answered 28/9, 2016 at 9:27 Comment(16)
But why do you need a separate permit and a thread counter? Thread count is a permit itself (the threadInA == 0 condition).Mortgagee
Thanks. However, does this disallow the situation (the 2nd situation) where multiple threads can enter methodB (and execute the application code) given that no threads are in methodA? Is the trick that semaphore.acquire() is immediately followed by semaphore.release() for this situation?Dur
@Mortgagee The semaphore is to have a waiting mechanism. AtomicInteger doesn't support waiting, while Semaphore on its own cannot do counting, unless we do something like 1k permits, thread in methodA acquires 1 permit and thread in methodB acquires all 1k permits.Glynda
@Dur if no threads are in A then there is an available permit. Thread entering B takes this permit and releases it immediately, allowing other thread to enter B. As a result, you can have multiple threads in B. Does that make sense now?Glynda
Agreed, so you just use semaphore instead of using a condition object to signal waiting thread. I like that in your case threads entering methodB might still be stopped from doing stuff if a new threads enters methodA, rather than signal all the threads waiting. It might be a useful feature.Mortgagee
@Mortgagee was that for first or second solution? :) What do you think about the edit I just added?Glynda
That was for the first one :-) I don't like the second one, because it implies 2 * MAX_THREADS compareAndSwapInt operations for each thread in methodB.Mortgagee
@Mortgagee Could you please explain what you mean by compareAndSwapInt operations? I think second solution is much simpler, but I cannot really see any problem with it.Glynda
If you look into the Semaphore.Sync.nonfairTryAcquireShared and Semaphore.Sync.tryReleaseShared methods you will see them calling compareAndSetState (which calls unsafe.compareAndSwapInt in it's turn) in a loop which terminates when there are no permits left. So if you have 1000 permits acquire and release will cause 2000 compareAndSwapInt operations. IMO it's an overkill for such a synchronization. CAS cost explained.Mortgagee
@Mortgagee I have just debugged the code for all 4 calls that are made to the semaphore and I cannot see any loop body executing more than once. Putting a breakpoint inside compareAndSetState and calling methodA once and methodB once results in exactly 4 hits.Glynda
You're right. I've missed that it does a CAS for (available - acquires) and not for each permit. Pretty dumb of me to suggest java.util.concurrent guys would do something like that, I should have looked twice. But still, my solution is slightly better because it does only 2 CAS operations per a thread in methodA, and yours - 2 CAS operations per thread in methodA and per thread in methodB :-)Mortgagee
@Mortgagee sounds a little bit like premature optimisation ;) We don't know what OP's code does, there might be other bigger bottlenecks than this ;) Anyway, thanks for your input.Glynda
@JaroslawPawlak Yes, I have seen it (and your discussions with @bashnesnos). Thanks. I will try your code. And I will accept it if it works and no better ones appear in one day.Dur
@JaroslawPawlak Consider this situation (w.r.t the second approach): Thread B enters methodB and executes the first two semaphore statements. Now Thread A enters methodA and executes semaphore.acquire() and the code. Before Thread A executing semaphore.release(), Thread B begins to execute the code in methodB. This is not what I want. But maybe I have not clearly specified the requirements of the synchronization between methodA and methodB.Dur
@Dur As long as there is at least one thread executing methodA, all threads entering methodB will wait at line semaphore.acquire(MAX_THREADS); until all threads leave methodA. Is that not what you want? Can you clarify what should happen when thread 1 attempts to enter methodB while thread 2 is still in methodA?Glynda
@JaroslawPawlak I realize that I have not clearly specified the requirements of the synchronization between methodA and methodB. Your approach works for the current requirement (I will accept it), but not for my original intention (maybe I should first clarify it and then post it in another thread). Thanks.Dur
F
0

Why not using an kind of external orchestrator? I mean another class that will be responsible to call the methodA or methodB when it allowed. Multi-thread can still be handle via locking or maybe just with some AtomicBoolean(s).

Please find below a naive draft of how to do it.

public class MyOrchestrator {

@Autowired
private ClassWithMethods classWithMethods;

private AtomicBoolean aBoolean = = new AtomicBoolean(true);

 public Object callTheDesiredMethodIfPossible(Method method, Object... params) {
   if(aBoolean.compareAndSet(true, false)) {
      return method.invoke(classWithMethods, params);
      aBoolean.set(true);
   }
   if ("methodA".equals(method.getName())) {
      return method.invoke(classWithMethods, params);
   }
 } 

}
Fortify answered 28/9, 2016 at 9:1 Comment(1)
An external orchestrator might be a good idea (or a bad idea, depending on context, and if I understand the hengxin correctly with an external orchestrator we must build a system so that the class where methodA and B reside can only be asked via the orchestrator). However, I think the question still remains, because you suggest a place where to do the magic, but the question is about how to do the magic.Lading
I
0

You can't really prevent that methodA or methodB is called (while other threads are inside the other method) but you can implement thread intercommunication in such a way so that you can still achieve what you want.

class MutualEx {
   boolean lock = false;

   public synchronized void methodA() {
      if (lock) {
         try {
            wait();
         }catch (InterruptedException e) {

         }
      }
      //do some processing
      lock = true;
      notifyAll();
   }

   public synchronized void methodB() {
      if (!lock) {
         try {
            wait();
         }catch (InterruptedException e) {

         }
      }

       //do some processing
      lock = false;
      notifyAll();
   }
}

Now, for this to work any Thread object you create should have a reference to the same instance of MutualEx object.

Iglesias answered 28/9, 2016 at 9:16 Comment(3)
In this case threads there couldn't be more than 1 thread in methodA or methodB due to "synchronized" keyword.Mortgagee
Yes, I misunderstood the OPs question and he made many edits. Since interthread communication requires synchronized to be used, this is obviously not what he/she wants.Iglesias
@Iglesias Thanks for your efforts. The discussions help me better understand and update my own problem.Dur
L
0

In very simple terms what you all need is ENTER methodB only if no thread inside methodA.

  • Simply you can have a global counter, first initialized to 0 to record the number of threads that are currently inside methodA(). You should have a lock/mutex assigned to protect the variable count.
  • Threads entering methodsA do count++.
  • Threads exiting methodA do count-- .
  • Threads that are entering methodB first should check whether count == 0.

    methodA(){
      mutex.lock();
      count++;
      mutex.signal();
    
      //do stuff   
    
      mutex.lock();
      count--;
      mutex.signal();     
    }
    
    methodB(){
      mutex.lock();
      if(count != 0){
          mutex.signal();
          return; 
      }
      mutex.signal();
      //do stuff    
    }
    
Lambda answered 28/9, 2016 at 9:33 Comment(2)
Thanks. However, does this disallow the situation (the 2nd situation) where multiple threads can enter methodB (and execute the application code) given that no threads are in methodA? Will the threads calling methodB have to wait on mutex.lock()?Dur
No, it doesn't disallow enterring multiple threads to A. That mutex is only there for protecting 'count'. It only causes no 2 threads can access variable 'count' at the same time. :))Lambda
M
0

You would need an int to count threads in methodA, and ReentrantLock.Condition to signal all threads waiting in methodB once there are no threads in methodA:

AtomicInteger threadsInMethodA = new AtomicInteger(0);
Lock threadsForMethodBLock = new ReentrantLock();
Condition signalWaitingThreadsForMethodB = threadsForMethodBLock.newCondition();

public void methodA() {
    threadsInMethodA.incrementAndGet();
    //do stuff
    if (threadsInMethodA.decrementAndGet() == 0) {
        try {
            threadsForMethodBLock.lock();
            signalWaitingThreadsForMethodB.signalAll();
        } finally {
            threadsForMethodBLock.unlock();
        }
    }
}

public void methodB() {
    try {
        threadsForMethodBLock.lock();
        while (!Thread.isInterrupted() && threadsInMethodA.get() != 0) {
            try {
                signalWaitingThreadsForMethodB.await();
            } catch (InterruptedException e) {
                Thread.interrupt();
                throw new RuntimeException("Not sure if you should continue doing stuff in case of interruption");
            }
        }
        signalWaitingThreadsForMethodB.signalAll();
    } finally {
        threadsForMethodBLock.unlock();
    }
    //do stuff

}

So each thread entering methodB will first check if nobody in methodA, and signal previous waiting threads. On the other hand, each thread entering methodA will increment counter to prevent new threads doing work in methodB, and on decrement it will release all the threads waiting to do stuff in methodB if no threads left inside methodA.

Mortgagee answered 28/9, 2016 at 9:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.