Piping data between threads with Java
Asked Answered
S

2

7

I am writing a multi-threaded application that mimics a movie theater. Each person involved is its own thread and concurrency must be done completely by semaphores. The only issue I am having is how to basically link threads so that they can communicate (via a pipe for instance).

For instance:

Customer[1] which is a thread, acquires a semaphore that lets it walk up to the Box Office. Now Customer[1] must tell the Box Office Agent that they want to see movie "X". Then BoxOfficeAgent[1] also a thread, must check to make sure the movie isn't full and either sell a ticket or tell Customer[1] to pick another movie.

How do I pass that data back and forth while still maintaining concurrency with the semaphores?

Also, the only class I can use from java.util.concurrent is the Semaphore class.

Subreption answered 9/4, 2011 at 4:36 Comment(1)
The main hint I would give is not to get a mental block with the word "pipe". Think more about having a 'box' with the information in, and you put a postit note on the box to tell the other thread when there's something interesting in the box for it to look at.Lody
S
8

One easy way to pass data back and forth between threads is to use the implementations of the interface BlockingQueue<E>, located in the package java.util.concurrent.

This interfaces has methods to add elements to the collection with different behaviors:

  • add(E): adds if possible, otherwise throws exception
  • boolean offer(E): returns true if the element has been added, false otherwise
  • boolean offer(E, long, TimeUnit): tries to add the element, waiting the specified amount of time
  • put(E): blocks the calling thread until the element has been added

It also defines methods for element retrieval with similar behaviors:

  • take(): blocks until there's an element available
  • poll(long, TimeUnit): retrieves an element or returns null

The implementations I use most frequently are: ArrayBlockingQueue, LinkedBlockingQueue and SynchronousQueue.

The first one, ArrayBlockingQueue, has a fixed size, defined by a parameter passed to its constructor.

The second, LinkedBlockingQueue, has illimited size. It will always accept any elements, that is, offer will return true immediately, add will never throw an exception.

The third, and to me the most interesting one, SynchronousQueue, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.

To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

Notice that this class presents similar behavior to what I described about the SynchronousQueue.

Once the methods put(E) gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling the take() method to proceed.

The first step of the take() method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the method put(E) may be called again by any thread, and returns what has been saved in the local variable.

As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods take() and put(E) are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior of take() and put(E) breaking the contract.

Finally, you could avoid the need to declare a local variable in the take() method by using try {} finally {} as follows:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

Here, the point of this example if just to show an use of try/finally that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.

Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an offer(E) method and a take(E, long, TimeUnit)!

Good luck.

Spencerianism answered 9/4, 2011 at 4:40 Comment(4)
This won't satisfy the homework requirement that "concurrency must be done completely by semaphores." (Of course in real life using one of the high-level concurrency utilities is the best option.)Franck
Indeed! I missed that at first.Spencerianism
Ya, the only item I can use from java.util.concurrent is semaphore. No other thread-safe classes can be used... So should this be done with pipes, and if so how?Subreption
@JustinY17: take a look the the Pipe<E> class I've added to the answer! And then, to practice, try to implement the other methods defined by BlockingQueue<E>, based on the two I gave!Spencerianism
O
1

Think it in terms of shared memory with read/write lock.

  1. Create a buffer to put the message.
  2. The access to the buffer should be controlled by using a lock/semaphore.
  3. Use this buffer for inter thread communication purpose.

Regards

PKV

Ogdon answered 9/4, 2011 at 6:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.