Can a non-thread-safe value be safely ported across thread boundaries using fork/join?
Asked Answered
A

1

6

I have some class which is not thread safe:

class ThreadUnsafeClass {
  long i;

  long incrementAndGet() { return ++i; }
}

(I've used a long as the field here, but we should think of its field as being some thread-unsafe type).

I now have a class which looks like this

class Foo {
  final ThreadUnsafeClass c;

  Foo(ThreadUnsafeClass c) {
    this.c = c;
  }
}

That is, the thread unsafe class is a final field of it. Now I'm going to do this:

public class JavaMM {
  public static void main(String[] args) {
    final ForkJoinTask<ThreadUnsafeClass> work = ForkJoinTask.adapt(() -> {
      ThreadUnsafeClass t = new ThreadUnsafeClass();
      t.incrementAndGet();
      return new FC(t);
    });

    assert (work.fork().join().c.i == 1); 
  }
}

That is, from thread T (main), I invoke some work on T' (the fork-join-pool) which creates and mutates an instance of my unsafe class and then returns the result wrapped in a Foo. Please note that all mutation of my thread unsafe class happens on a single thread, T'.

Question 1: Am I guaranteed that the end-state of the instance of the thread-unsafe-class is safely ported across the T' ~> T thread boundary at the join?

Question 2: What if I had done this using parallel streams? For example:

Map<Long, Foo> results = 
  Stream
    .of(new ThreadUnsafeClass())
    .parallel()
    .map(tuc -> {
      tuc.incrementAndGet();
      return new Foo(tuc);
    })
    .collect(
      Collectors.toConcurrentMap(
        foo -> foo.c.i,
        Function.identity();
      )
    );
assert(results.get(1) != null)
Appolonia answered 11/1, 2018 at 11:21 Comment(10)
I don;t think there is any guarantee of safety, however you seem to be accessing from only one thread at a time. Your Foo wrapper declaring at final should make not difference. Have you looked at the Atomic options in java.util.concurrent.atomic?Binoculars
It's just an issue which threads are able to access one ThreadUnsafeClass object, no matter how (via some ThreadUnsafeClass tuc or via some Foo foo as foo.c), and then whether they call incrementAndGet. Merely passing a ThreadUnsafeClass reference from here to there isn't going to affect it. - If you just need a unique token to identify Foo instances there are better ways.Monkey
Using a long introduces additional problem complexity since datatypes long and double are the only types that are not guaranteed to be written/read atomically. This means that in theory, one thread could read the first half of the long, another thread could update the long and then the first thread could read the second (updated) half of the long. In the given scenario, this has no effect. But having multiple Threads executing incrementAndGet() could lead to very messy situations.Toady
@Monkey - Can you edit this comment? It's not at all clear to me what you meanAppolonia
@Toady - a given instance of ThreadUnsafeClass is created and mutated only from within a single thread T'. After mutation has finished it is passed across a thread boundary. The question is: are all mutations in T' subsequently visible to T?Appolonia
@Appolonia I know, thus the last two sentences of my previous comment.Toady
@Appolonia At least for your first question, I am reasonably sure that the result should be guaranteed since you have a happens-before relationship between incrementAndGet() and the access to i and you do not access i beforehand, thus caching should not be an issue.Toady
@Toady - I see now. I still am none the wiser as to an answer to my questionAppolonia
@Toady - that was a remark to your previous comment. I think you are almost certainly correct here because the FJP must offer a HB boundary at the join. But I'm surprised that the documentation of it doesn't talk about this (and neither does the documentation on parallel streams)Appolonia
Well, my problem is that you use "class instances ported across thread boundaries". That's a wording totally alien to me, as there never is the "porting" (moving from here to there) of "class instances" (or objects) at any time at all. What you do copy is references, and as long nobody calls methods an object of a class using information hiding properly isn't affected.Monkey
C
11

I think ForkJoinTask.join() has the same memory effects as Future.get() (because it says in join() Javadoc that is is basically get() with interruption and exception differences). And Future.get() is specified as:

Actions taken by the asynchronous computation represented by a Future happen-before actions subsequent to the retrieval of the result via Future.get() in another thread.

In other words, this is basically a "safe publication" via Future/FJT. Which means, anything that the executor thread did and published via FJT result is visible to FJT.join() users. Since the example allocates the object and populates its field only within the executor thread, and nothing happens with the object after it gets returned from the executor, it stands to reason that we are only allowed to see the values the executor thread produced.

Note that putting the whole thing via final does not bring any additional benefit to it. Even if you just did the plain field stores, you would still be guaranteed this:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    Future<MyObject> f = s.submit(() -> new MyObject(42));
    assert (f.get().x == 42); // guaranteed!
    s.shutdown();
}

public class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

But notice that in the Stream example (if we assume the symmetry between Stream.of.parallel and Executor.submit, and between Stream.collect and FJT.join/Future.get), you have created the object in the caller thread, then passed it to executor to do something. This is a subtle difference, but it does not matter much still, because we also have HB on submit, that preclude seeing the old state of the object:

public static void main(String... args) throws Exception {
    ExecutorService s = Executors.newCachedThreadPool();
    MyObject o = new MyObject(42);
    Future<?> f = s.submit(() -> o.x++); // new --hb--> submit
    f.get(); // get -->hb--> read o.x
    assert (o.x == 43); // guaranteed
    s.shutdown();
}

public static class MyObject {
    int x;
    public MyObject(int x) { this.x = x; }
}

(In formal speak, that is because all the HB paths from read(o.x) go via the action of the executor thread that does store(o.x, 43))

Cnut answered 11/1, 2018 at 12:36 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.