AtomicReference to a mutable object and visibility
Asked Answered
R

4

9

Say I have an AtomicReferenceto a list of objects:

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());

Thread A adds elements to this list: batch.get().add(o);

Later, thread B takes the list and, for example, stores it in a DB: insertBatch(batch.get());

Do I have to do additional synchronization when writing (Thread A) and reading (Thread B) to ensure thread B sees the list the way A left it, or is this taken care of by the AtomicReference?

In other words: if I have an AtomicReference to a mutable object, and one thread changes that object, do other threads see this change immediately?

Edit:

Maybe some example code is in order:

public void process(Reader in) throws IOException {
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
    ExecutorService exec = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i) {
        tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
            @Override public AtomicReference<List<Object>> call() throws IOException {

                final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));

                Processor.this.parser.parse(in, new Parser.Handler() {
                    @Override public void onNewObject(Object event) {
                            batch.get().add(event);

                            if (batch.get().size() >= batchSize) {
                                dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
                            }
                    }
                });

                return batch;
            }
        }));
    }

    List<Object> remainingBatches = new ArrayList<Object>();

    for (Future<AtomicReference<List<Object>>> task : tasks) {
        try {
            AtomicReference<List<Object>> remainingBatch = task.get();
            remainingBatches.addAll(remainingBatch.get());
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();

            if (cause instanceof IOException) {
                throw (IOException)cause;
            }

            throw (RuntimeException)cause;
        }
    }

    // these haven't been flushed yet by the worker threads
    if (!remainingBatches.isEmpty()) {
        dao.insertBatch(remainingBatches);
    }
}

What happens here is that I create four worker threads to parse some text (this is the Reader in parameter to the process() method). Each worker saves the lines it has parsed in a batch, and flushes the batch when it is full (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));).

Since the number of lines in the text isn't a multiple of the batch size, the last objects end up in a batch that isn't flushed, since it's not full. These remaining batches are therefore inserted by the main thread.

I use AtomicReference.getAndSet() to replace the full batch with an empty one. It this program correct with regards to threading?

Race answered 21/2, 2012 at 13:19 Comment(0)
S
11

Um... it doesn't really work like this. AtomicReference guarantees that the reference itself is visible across threads i.e. if you assign it a different reference than the original one the update will be visible. It makes no guarantees about the actual contents of the object that reference is pointing to.

Therefore, read/write operations on the list contents require separate synchronization.

Edit: So, judging from your updated code and the comment you posted, setting the local reference to volatile is sufficient to ensure visibility.

Stovall answered 21/2, 2012 at 13:22 Comment(6)
Okay, I added some example code to my question above. I use AtomicReference.getAndSet() to replace a full batch with a fresh empty one. Do I still need additional synchronization?Race
Yes, your code is correct, although the use of AtomicReference does not seem to be needed here.Stovall
@Stovall was thinking the exact same thing. In fact the getAndSet() may not do what he want's because it will get the current value and then change the value of the AtomicReference.Oberland
@Tudor: yes, that's what I was thinking too. But the problem is that batch has to be declared final to be able to use it in the onNewObject() method of my (anonymous) handler. Anyway, I've decided to ditch the anonymous inner Callable class in favor of a more readable and testable ParseAndInsertTask class that implements Callable, and make batch an instance variable of the task, instead of a local variable.Race
Note that if you do not need to do atomic compare-and-swap operations, you can just use a volatile variable rather than an AtomicReference.Opinionative
@Bossie: in that case volatile should be sufficient, although the call to Future.get may introduce a memory fence, but I'm not sure. Just leave it as volatile.Stovall
T
1

I think that, forgetting all the code here, you exact question is this:

Do I have to do additional synchronization when writing (Thread A) and reading (Thread B) to ensure thread B sees the list the way A left it, or is this taken care of by the AtomicReference?

So, the exact response to that is: YES, atomic take care of visibility. And it is not my opinion but the JDK documentation one:

The memory effects for accesses and updates of atomics generally follow the rules for volatiles, as stated in The Java Language Specification, Third Edition (17.4 Memory Model).

I hope this helps.

Teapot answered 14/8, 2012 at 16:45 Comment(0)
V
0

Adding to Tudor's answer: You will have to make the ArrayList itself threadsafe or - depending on your requirements - even larger code blocks.

If you can get away with a threadsafe ArrayList you can "decorate" it like this:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>());

But keep in mind: Even "simple" constructs like this are not threadsafe with this:

Object o = batch.get(batch.size()-1);
V1 answered 21/2, 2012 at 13:33 Comment(0)
S
0

The AtomicReference will only help you with the reference to the list, it will not do anything to the list itself. More particularly, in your scenario, you will almost certainly run into problems when the system is under load where the consumer has taken the list while the producer is adding an item to it.

This sound to me like you should be using a BlockingQueue. You can then Limit the memory footprint if you producer is faster than your consumer and let the queue handle all contention.

Something like:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50);

// ... Producer
queue.put(o);

// ... Consumer
List<Object> queueContents = new ArrayList<Object> ();
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items.
queue.drainTo(queueContents);

Added

Thanks to @Tudor for pointing out the architecture you are using. ... I have to admit it is rather strange. You don't really need AtomicReference at all as far as I can see. Each thread owns its own ArrayList until it is passed on to dao at which point it is replaced so there is no contention at all anywhere.

I am a little concerned about you creating four parser on a single Reader. I hope you have some way of ensuring each parser does not affect the others.

I personally would use some form of producer-consumer pattern as I have described in the code above. Something like this perhaps.

static final int PROCESSES = 4;
static final int batchSize = 10;

public void process(Reader in) throws IOException, InterruptedException {

  final List<Future<Void>> tasks = new ArrayList<Future<Void>>();
  ExecutorService exec = Executors.newFixedThreadPool(PROCESSES);
  // Queue of objects.
  final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2);
  // The final object to post.
  final Object FINISHED = new Object();

  // Start the producers.
  for (int i = 0; i < PROCESSES; i++) {
    tasks.add(exec.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {

        Processor.this.parser.parse(in, new Parser.Handler() {
          @Override
          public void onNewObject(Object event) {
            queue.add(event);
          }
        });
        // Post a finished down the queue.
        queue.add(FINISHED);
        return null;
      }
    }));
  }

  // Start the consumer.
  tasks.add(exec.submit(new Callable<Void>() {
    @Override
    public Void call() throws IOException {
      List<Object> batch = new ArrayList<Object>(batchSize);
      int finishedCount = 0;
      // Until all threads finished.
      while ( finishedCount < PROCESSES ) {
        Object o = queue.take();
        if ( o != FINISHED ) {
          // Batch them up.
          batch.add(o);
          if ( batch.size() >= batchSize ) {
            dao.insertBatch(batch);
            // If insertBatch takes a copy we could merely clear it.
            batch = new ArrayList<Object>(batchSize);
          }
        } else {
          // Count the finishes.
          finishedCount += 1;
        }
      }
      // Finished! Post any incopmplete batch.
      if ( batch.size() > 0 ) {
        dao.insertBatch(batch);
      }
      return null;
    }
  }));

  // Wait for everything to finish.
  exec.shutdown();
  // Wait until all is done.
  boolean finished = false;
  do {
    try {
      // Wait up to 1 second for termination.
      finished = exec.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
    }
  } while (!finished);
}
Seumas answered 21/2, 2012 at 16:41 Comment(1)
Umm... from his code it doesn't look like producer-consumer. He's actually spawning a team of threads, each doing some work, then joining them and finishing the work in the main thread. There is no actual data being passed between threads.Stovall

© 2022 - 2024 — McMap. All rights reserved.