Extending java's ThreadLocal to allow the values to be reset across all threads
Asked Answered
G

3

1

After looking at this question, I think I want to wrap ThreadLocal to add a reset behavior.

I want to have something similar to a ThreadLocal, with a method I can call from any thread to set all the values back to the same value. So far I have this:

public class ThreadLocalFlag {

    private ThreadLocal<Boolean> flag;
    private List<Boolean> allValues = new ArrayList<Boolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<Boolean>() {
            @Override protected Boolean initialValue() {
                Boolean value = false;
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get();
    }

    public void set(Boolean value) {
        flag.set(value);
    }

    public void setAll(Boolean value) {
        for (Boolean tlValue : allValues) {
            tlValue = value;
        }
    }
}

I'm worried that the autoboxing of the primitive may mean the copies I've stored in the list will not reference the same variables referenced by the ThreadLocal if I try to set them. I've not yet tested this code, and with something tricky like this I'm looking for some expert advice before I continue down this path.

Someone will ask "Why are you doing this?". I'm working in a framework where there are other threads that callback into my code, and I don't have references to them. Periodically I want to update the value in a ThreadLocal variable they use, so performing that update requires that the thread which uses the variable do the updating. I just need a way to notify all these threads that their ThreadLocal variable is stale.


I'm flattered that there is new criticism recently regarding this three year old question, though I feel the tone of it is a little less than professional. The solution I provided has worked without incident in production during that time. However, there are bound to be better ways to achieve the goal that prompted this question, and I invite the critics to supply an answer that is clearly better. To that end, I will try to be more clear about the problem I was trying to solve.

As I mentioned earlier, I was using a framework where multiple threads are using my code, outside my control. That framework was QuickFIX/J, and I was implementing the Application interface. That interface defines hooks for handling FIX messages, and in my usage the framework was configured to be multithreaded, so that each FIX connection to the application could be handled simultaneously.

However, the QuickFIX/J framework only uses a single instance of my implementation of that interface for all the threads. I'm not in control of how the threads get started, and each is servicing a different connection with different configuration details and other state. It was natural to let some of that state, which is frequently accessed but seldom updated, live in various ThreadLocals that load their initial value once the framework has started the thread.

Elsewhere in the organization, we had library code to allow us to register for callbacks for notification of configuration details that change at runtime. I wanted to register for that callback, and when I received it, I wanted to let all the threads know that it's time to reload the values of those ThreadLocals, as they may have changed. That callback comes from a thread I don't control, just like the QuickFIX/J threads.

My solution below uses ThreadLocalFlag (a wrapped ThreadLocal<AtomicBoolean>) solely to signal the other threads that it may be time to update their values. The callback calls setAll(true), and the QuickFIX/J threads call set(false) when they begin their update. I have downplayed the concurrency issues of the ArrayList because the only time the list is added to is during startup, and my use case was smaller than the default size of the list.

I imagine the same task could be done with other interthread communication techniques, but for what it's doing, this seemed more practical. I welcome other solutions.

Guajardo answered 31/1, 2012 at 23:13 Comment(0)
G
-1

I'm disappointed in the quality of the answers received for this question; I have found my own solution.

I wrote my test case today, and found the only issue with the code in my question is the Boolean. Boolean is not mutable, so my list of references wasn't doing me any good. I had a look at this question, and changed my code to use AtomicBoolean, and now everything works as expected.

public class ThreadLocalFlag {

    private ThreadLocal<AtomicBoolean> flag;
    private List<AtomicBoolean> allValues = new ArrayList<AtomicBoolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<AtomicBoolean>() {
            @Override protected AtomicBoolean initialValue() {
                AtomicBoolean value = new AtomicBoolean();
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get().get();
    }

    public void set(boolean value) {
        flag.get().set(value);
    }

    public void setAll(boolean value) {
        for (AtomicBoolean tlValue : allValues) {
            tlValue.set(value);
        }
    }
}

Test case:

public class ThreadLocalFlagTest {

    private static ThreadLocalFlag flag = new ThreadLocalFlag();
    private static boolean runThread = true;

    @AfterClass
    public static void tearDownOnce() throws Exception {
        runThread = false;
        flag = null;
    }

    /**
     * @throws Exception if there is any issue with the test
     */
    @Test
    public void testSetAll() throws Exception {
        startThread("ThreadLocalFlagTest-1", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-2", true);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-3", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-4", true);
        try {
            Thread.sleep(8000L); //watch the alternating values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(true);
        try {
            Thread.sleep(8000L); //watch the true values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(false);
        try {
            Thread.sleep(8000L); //watch the false values
        } catch (InterruptedException e) {
            //ignore
        }
    }

    private void startThread(String name, boolean value) {
        Thread t = new Thread(new RunnableCode(value));
        t.setName(name);
        t.start();
    }

    class RunnableCode implements Runnable {

        private boolean initialValue;

        RunnableCode(boolean value) {
            initialValue = value;
        }

        @Override
        public void run() {
            flag.set(initialValue);
            while (runThread) {
                System.out.println(Thread.currentThread().getName() + ": " + flag.get());
                try {
                    Thread.sleep(4000L);
                } catch (InterruptedException e) {
                    //ignore
                }
            }
        }
    }
}
Guajardo answered 1/2, 2012 at 16:1 Comment(2)
this is not a complete solution since the List is not thread safe, if you think it is your code is either so extremely trivial that why you are doing this is silly overkill or you just do not understand what non-deterministic means, and you will soon when this introduces bizarre states because of things getting overwritten in a non-deterministic manner. You so called test case is fundamentally flawed. Anyone coming along in the future reading this should take this answer as a lesson what not to do!Fantast
@JarrodRoberson so what, just wrap list with synchronized () block on that list and it will work like a charm, whole solution will.Britishism
P
8

Interacting with objects in a ThreadLocal across threads

I'll say up front that this is a bad idea. ThreadLocal is a special class which offers speed and thread-safety benefits if used correctly. Attempting to communicate across threads with a ThreadLocal defeats the purpose of using the class in the first place.

If you need access to an object across multiple threads there are tools designed for this purpose, notably the thread-safe collections in java.util.collect.concurrent such as ConcurrentHashMap, which you can use to replicate a ThreadLocal by using Thread objects as keys, like so:

ConcurrentHashMap<Thread, AtomicBoolean> map = new ConcurrentHashMap<>();

// pass map to threads, let them do work, using Thread.currentThread() as the key

// Update all known thread's flags
for(AtomicBoolean b : map.values()) {
    b.set(true);
}

Clearer, more concise, and avoids using ThreadLocal in a way it's simply not designed for.

Notifying threads that their data is stale

I just need a way to notify all these threads that their ThreadLocal variable is stale.

If your goal is simply to notify other threads that something has changed you don't need a ThreadLocal at all. Simply use a single AtomicBoolean and share it with all your tasks, just like you would your ThreadLocal<AtomicBoolean>. As the name implies updates to an AtomicBoolean are atomic and visible cross-threads. Even better would be to use a real synchronization aid such as CyclicBarrier or Phaser, but for simple use cases there's no harm in just using an AtomicBoolean.

Creating an updatable "ThreadLocal"

All of that said, if you really want to implement a globally update-able ThreadLocal your implementation is broken. The fact that you haven't run into issues with it is only a coincidence and future refactoring may well introduce hard-to-diagnose bugs or crashes. That it "has worked without incident" only means your tests are incomplete.

  • First and foremost, an ArrayList is not thread-safe. You simply cannot use it (without external synchronization) when multiple threads may interact with it, even if they will do so at different times. That you aren't seeing any issues now is just a coincidence.
  • Storing the objects as a List prevents us from removing stale values. If you call ThreadLocal.set() it will append to your list without removing the previous value, which introduces both a memory leak and the potential for unexpected side-effects if you anticipated these objects becoming unreachable once the thread terminated, as is usually the case with ThreadLocal instances. Your use case avoids this issue by coincidence, but there's still no need to use a List.

Here is an implementation of an IterableThreadLocal which safely stores and updates all existing instances of the ThreadLocal's values, and works for any type you choose to use:

import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;

import com.google.common.collect.MapMaker;

/**
 * Class extends ThreadLocal to enable user to iterate over all objects
 * held by the ThreadLocal instance.  Note that this is inherently not
 * thread-safe, and violates both the contract of ThreadLocal and much
 * of the benefit of using a ThreadLocal object.  This class incurs all
 * the overhead of a ConcurrentHashMap, perhaps you would prefer to
 * simply use a ConcurrentHashMap directly instead?
 * 
 * If you do really want to use this class, be wary of its iterator.
 * While it is as threadsafe as ConcurrentHashMap's iterator, it cannot
 * guarantee that all existing objects in the ThreadLocal are available
 * to the iterator, and it cannot prevent you from doing dangerous
 * things with the returned values.  If the returned values are not
 * properly thread-safe, you will introduce issues.
 */
public class IterableThreadLocal<T> extends ThreadLocal<T>
                                    implements Iterable<T> {
    private final ConcurrentMap<Thread,T> map;

    public IterableThreadLocal() {
        map = new MapMaker().weakKeys().makeMap();
    }

    @Override
    public T get() {
        T val = super.get();
        map.putIfAbsent(Thread.currentThread(), val);
        return val;
    }

    @Override
    public void set(T value) {
        map.put(Thread.currentThread(), value);
        super.set(value);
    }

    /**
     * Note that this method fundamentally violates the contract of
     * ThreadLocal, and exposes all objects to the calling thread.
     * Use with extreme caution, and preferably only when you know
     * no other threads will be modifying / using their ThreadLocal
     * references anymore.
     */
    @Override
    public Iterator<T> iterator() {
        return map.values().iterator();
    }
}

As you can hopefully see this is little more than a wrapper around a ConcurrentHashMap, and incurs all the same overhead as using one directly, but hidden in the implementation of a ThreadLocal, which users generally expect to be fast and thread-safe. I implemented it for demonstration purposes, but I really cannot recommend using it in any setting.

Pustulant answered 27/3, 2013 at 7:58 Comment(5)
I think you've missed the point. I didn't want an iterator, and I didn't want any concurrent access to the same values between threads, except for the notion of a reset action. Thread safety of the ArrayList isn't an issue, as it's only used locally.Guajardo
Apologies, but I think you've missed the point. Thread safety of ArrayList is unfortunately an issue, as multiple threads may attempt to add to it (and in particular, resize it) at the same time. Your implementation may coincidentally avoid this at the moment, but it's a very dangerous pattern. When working with multiple threads, you want to minimize the amount of communication going on between threads, providing a thread-safe Iterable is less risky than sharing an ArrayList between threads. Since all you do in your example is loop over the list, you only need an Iterable.Pustulant
And I'll reiterate, you can easily, efficiently, and safely do exactly what you want with the code sample at the top of my answer. This is a much cleaner and safer way to control state between threads, ThreadLocal is fundamentally and intentionally not designed to do this.Pustulant
Excellent, worked like a charm. We are dealing with heavily multi threaded application. we would recommend this approach. we are working with an app that suppose to process 2000 msgs per second, and in that multi threaded env we process 20000, cannot share the time :). Thanks.Evanne
@aamir for concurrent message processing you should generally use a thread-safe Queue such as ConcurrentLinkedQueue or one of the BlockingQueue implementations.Pustulant
G
1

It won't be a good idea to do that since the whole point of thread local storage is, well, thread locality of the value it contains - i.e. that you can be sure that no other thread than your own thread can touch the value. If other threads could touch your thread local value, it won't be "thread local" anymore and that will break the memory model contract of thread local storage.

Either you have to use something other than ThreadLocal (e.g. a ConcurrentHashMap) to store the value, or you need to find a way to schedule an update on the threads in question.

You could use google guava's map maker to create a static final ConcurrentWeakReferenceIdentityHashmap with the following type: Map<Thread, Map<String, Object>> where the second map is a ConcurrentHashMap. That way you'd be pretty close to ThreadLocal except that you can iterate through the map.

Garbers answered 31/1, 2012 at 23:42 Comment(0)
G
-1

I'm disappointed in the quality of the answers received for this question; I have found my own solution.

I wrote my test case today, and found the only issue with the code in my question is the Boolean. Boolean is not mutable, so my list of references wasn't doing me any good. I had a look at this question, and changed my code to use AtomicBoolean, and now everything works as expected.

public class ThreadLocalFlag {

    private ThreadLocal<AtomicBoolean> flag;
    private List<AtomicBoolean> allValues = new ArrayList<AtomicBoolean>();

    public ThreadLocalFlag() {
        flag = new ThreadLocal<AtomicBoolean>() {
            @Override protected AtomicBoolean initialValue() {
                AtomicBoolean value = new AtomicBoolean();
                allValues.add(value);
                return value;
            }
        };
    }

    public boolean get() {
        return flag.get().get();
    }

    public void set(boolean value) {
        flag.get().set(value);
    }

    public void setAll(boolean value) {
        for (AtomicBoolean tlValue : allValues) {
            tlValue.set(value);
        }
    }
}

Test case:

public class ThreadLocalFlagTest {

    private static ThreadLocalFlag flag = new ThreadLocalFlag();
    private static boolean runThread = true;

    @AfterClass
    public static void tearDownOnce() throws Exception {
        runThread = false;
        flag = null;
    }

    /**
     * @throws Exception if there is any issue with the test
     */
    @Test
    public void testSetAll() throws Exception {
        startThread("ThreadLocalFlagTest-1", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-2", true);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-3", false);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            //ignore
        }
        startThread("ThreadLocalFlagTest-4", true);
        try {
            Thread.sleep(8000L); //watch the alternating values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(true);
        try {
            Thread.sleep(8000L); //watch the true values
        } catch (InterruptedException e) {
            //ignore
        }
        flag.setAll(false);
        try {
            Thread.sleep(8000L); //watch the false values
        } catch (InterruptedException e) {
            //ignore
        }
    }

    private void startThread(String name, boolean value) {
        Thread t = new Thread(new RunnableCode(value));
        t.setName(name);
        t.start();
    }

    class RunnableCode implements Runnable {

        private boolean initialValue;

        RunnableCode(boolean value) {
            initialValue = value;
        }

        @Override
        public void run() {
            flag.set(initialValue);
            while (runThread) {
                System.out.println(Thread.currentThread().getName() + ": " + flag.get());
                try {
                    Thread.sleep(4000L);
                } catch (InterruptedException e) {
                    //ignore
                }
            }
        }
    }
}
Guajardo answered 1/2, 2012 at 16:1 Comment(2)
this is not a complete solution since the List is not thread safe, if you think it is your code is either so extremely trivial that why you are doing this is silly overkill or you just do not understand what non-deterministic means, and you will soon when this introduces bizarre states because of things getting overwritten in a non-deterministic manner. You so called test case is fundamentally flawed. Anyone coming along in the future reading this should take this answer as a lesson what not to do!Fantast
@JarrodRoberson so what, just wrap list with synchronized () block on that list and it will work like a charm, whole solution will.Britishism

© 2022 - 2024 — McMap. All rights reserved.