Thread-safe circular buffer in Java
Asked Answered
H

8

27

Consider a few web server instances running in parallel. Each server holds a reference to a single shared "Status keeper", whose role is keeping the last N requests from all servers.

For example (N=3):

Server a: "Request id = ABCD"        Status keeper=["ABCD"]
Server b: "Request id = XYZZ"        Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"        Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO"         Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR"         Status keeper=["1234", "FOO", "BAR"]

At any point in time, the "Status keeper" might be called from a monitoring application that reads these last N requests for an SLA report.

What's the best way to implement this producer-consumer scenario in Java, giving the web servers higher priority than the SLA report?

CircularFifoBuffer seems to be the appropriate data structure to hold the requests, but I'm not sure what's the optimal way to implement efficient concurrency.

Hen answered 18/6, 2012 at 8:24 Comment(7)
Define "higher priority". What if the report have started reading the buffer? Should it break and start over if someone wants to write it? Can that in turn lead to starvation?Hallee
It should never starve and it should never be stopped, but it can wait a bit longer - meaning that its priority should slowly increase over time.Hen
How many producers and how many consumers should the ring buffer have, I will drop some code when you provide data.Dungdungan
Reading is quite scarce, probably not more than once in a few minutes. Writing is very often, might peak to hundreds of calls per second. I can tolerate rare discrepancies.Hen
@Adam And how large is N typically? With the example N=3, I would not go with a central "status keeper" server approach, as 99% of the data sent to it is discarded.Swearword
@Swearword Somewhere between 1,000 and 10,000.Hen
With those low numbers, I would simply go for a synchronized CircularFifoBuffer, as @maydeTo suggests. You wont introduce any significant bottlenecks with that.Swearword
B
22
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());
Brigandage answered 21/6, 2012 at 12:48 Comment(8)
Doesn't really matter as long as the initialization code isn't raceableBrigandage
Where is BufferUtils from? I tried using this from Apache, in gradle file: "compile 'org.apache.commons:commons-collections4:4.1'" , but it's not there...Sudra
@androiddeveloper In Apache commons collections4 CircularFifoBuffer has been replaced with CircularFifoQueue which can be synchronized by wrapping it with QueueUtils.Cronyism
@Cronyism I see. Why isn't there a concurrent solution, though?Sudra
@androiddeveloper There is a solution that handles concurrency. As with the (newer) JDK collections, the basic Queue collection is unsynchronized for performance. If a collection is desired that handles concurrency (i.e., thread-safety), then the collection should be wrapped using the proper synchronized wrapper; see QueueUtils.synchronizedQueue(Queue<E> queue) in the case of the CircularFifoQueue (as my previous comment states).Cronyism
@Cronyism synchronized solution isn't good for concurrency... It's good just for thread safety... Concurrency mean that multiple threads can perform "problematic" operations at the same time, without making other threads wait for them to finish.Sudra
@androiddeveloper A synchronized solution is one solution intended to handle concurrency. It is not the only solution, but it is one solution. Apache commons is just following the pattern used by the JDK in offering synchronized collection wrappers for use in thread safety.Cronyism
@Cronyism Synchronization is a solution, yes, but it's also the most naive one...Sudra
T
8

Here's a lock-free ring buffer implementation. It implements a fixed-size buffer - there is no FIFO functionality. I would suggest you store a Collection of requests for each server instead. That way your report can do the filtering rather than getting your data structure to filter.

/**
 * Container
 * ---------
 * 
 * A lock-free container that offers a close-to O(1) add/remove performance.
 * 
 */
public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  // TESTING {
  AtomicLong totalAdded = new AtomicLong(0);
  AtomicLong totalFreed = new AtomicLong(0);
  AtomicLong totalSkipped = new AtomicLong(0);

  private void resetStats() {
    totalAdded.set(0);
    totalFreed.set(0);
    totalSkipped.set(0);
  }
  // TESTING }

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    totalAdded.incrementAndGet();
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // Keep count of skipped.
    totalSkipped.addAndGet(skipped);
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    totalFreed.incrementAndGet();
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    public T get () {
      return element;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      // Made into a `while` loop to fix issue reported by @Nim in code review
      while (next == null && limit > 0) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}

Note that this is close to O1 put and get. A Separator just emits "" first time around and then its parameter from then on.

Edit: Added test methods.

// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");

private static synchronized void log(boolean toStdoutToo, String s) {
  if (Debug) {
    if (toStdoutToo) {
      System.out.println(s);
    }
    log(s);
  }
}

private static synchronized void log(String s) {
  if (Debug) {
    try {
      log.writeLn(logName, s);
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}
static volatile boolean testing = true;

// Tester object to exercise the container.
static class Tester<T> implements Runnable {
  // My name.

  T me;
  // The container I am testing.
  Container<T> c;

  public Tester(Container<T> container, T name) {
    c = container;
    me = name;
  }

  private void pause() {
    try {
      Thread.sleep(0);
    } catch (InterruptedException ex) {
      testing = false;
    }
  }

  public void run() {
    // Spin on add/remove until stopped.
    while (testing) {
      // Add it.
      Node<T> n = c.add(me);
      log("Added " + me + ": " + c.toString());
      pause();
      // Remove it.
      c.remove(n, me);
      log("Removed " + me + ": " + c.toString());
      pause();
    }
  }
}
static final String[] strings = {
  "One", "Two", "Three", "Four", "Five",
  "Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);

public static void main(String[] args) throws InterruptedException {
  Debug = true;
  log.delete(logName);
  Container<String> c = new Container<String>(10);

  // Simple add/remove
  log(true, "Simple test");
  Node<String> it = c.add(strings[0]);
  log("Added " + c.toString());
  c.remove(it, strings[0]);
  log("Removed " + c.toString());

  // Capacity test.
  log(true, "Capacity test");
  ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
  // Fill it.
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  // Add one more.
  try {
    c.add("Wafer thin mint!");
  } catch (IllegalStateException ise) {
    log("Full!");
  }
  c.clear();
  log("Empty: " + c.toString());

  // Iterate test.
  log(true, "Iterator test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
  }
  StringBuilder all = new StringBuilder ();
  Separator sep = new Separator(",");
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("All: "+all);
  for (int i = 0; i < strings.length; i++) {
    c.remove(nodes.get(i), strings[i]);
  }
  sep.reset();
  all.setLength(0);
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("None: " + all.toString());

  // Multiple add/remove
  log(true, "Multi test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  log("Filled " + c.toString());
  for (int i = 0; i < strings.length - 1; i++) {
    c.remove(nodes.get(i), strings[i]);
    log("Removed " + strings[i] + " " + c.toString());
  }
  c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
  log("Empty " + c.toString());

  // Multi-threaded add/remove
  log(true, "Threads test");
  c.clear();
  for (int i = 0; i < TEST_THREADS; i++) {
    Thread t = new Thread(new Tester<String>(c, strings[i]));
    t.setName("Tester " + strings[i]);
    log("Starting " + t.getName());
    t.start();
  }
  // Wait for 10 seconds.
  long stop = System.currentTimeMillis() + 10 * 1000;
  while (System.currentTimeMillis() < stop) {
    Thread.sleep(100);
  }
  // Stop the testers.
  testing = false;
  // Wait some more.
  Thread.sleep(1 * 100);
  // Get stats.
  double added = c.totalAdded.doubleValue();
  double skipped = c.totalSkipped.doubleValue();
  //double freed = c.freed.doubleValue();
  log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}
Thurible answered 18/6, 2012 at 10:20 Comment(5)
Do you have any formal verification for the correctness of this algorithm? Lock-free data structures are notoriously hard to get right, unless you avoid reusing nodes...Slobbery
@Slobbery - how 'formal' do you need? The primary algorithm is in the getFree method which selects a free node and marks it for use. It is quite simple and its correctness should be self-evident. I have added my test methods. Perhaps they will help.Thurible
The kind of 'formal' that published and peer-reviewed algorithms have. I have worked with lock-free data structures extensively and they can be extremely difficult to get right. There are just too many corner cases...Slobbery
Sorry but I couldn't resist ... there are no corner cases in a ring. <grin> ... Seriously though, I have not formally published it (except here) or had it peer-reviewed. If you wish I will post it on CodeReview and see what they say. I have been using it in a live environment for quite a while now and it seems to be reliable. I know there is some sloppiness in the statistics it collects but they are not part of the algorithm.Thurible
Insertion and deletion look fine, since the Node.free flag is CAS'ed and this also ensures inter-thread visibility of Node.element. However there are probably design issues which show up with the iterator -- since free and element are independent fields, UsedNodesIterator cannot avoid race issues when reading. I believe most java.util.concurrent collections combine 'element' and 'free' state into a single field to allow race-free reading, and disallow null elements to assist this.Gatlin
I
3

Maybe you want to look at Disruptor - Concurrent Programming Framework.

  • Find a paper describing the alternatives, design and also a performance comparement to java.util.concurrent.ArrayBlockingQueue here: pdf
  • Consider to read the first three articles from BlogsAndArticles

If the library is too much, stick to java.util.concurrent.ArrayBlockingQueue

Interleaf answered 28/6, 2012 at 11:39 Comment(0)
C
2

I would have a look at ArrayDeque, or for a more concurrent implementation have a look at the Disruptor library which is one of the most sophisticated/complex ring buffer in Java.

An alternative is to use an unbounded queue which is more concurrent as the producer never needs to wait for the consumer. Java Chronicle

Unless your needs justify the complexity, an ArrayDeque may be all you need.

Carbonization answered 18/6, 2012 at 8:54 Comment(7)
One important issue: ArrayDeque is not size-limited. It uses a circular array, true, but it will resize to accommodate more elements as necessary. The OP would have to manually pop() an element before inserting a new one after a while, all while also explicitly maintaining thread-safety...Slobbery
If you need it to be size limited you can use ArrayBlockingQueue.Carbonization
ArrayBlockingQueue limits its size by blocking until an element is removed. As far as I can tell, the OP wants the queue to implicitly drop/overwrite the oldest element, only keeping the latest N elements.Slobbery
You could be right, this doesn't a sound like a safe behaviour IMHO. Correctness is usually more important than performance and randomly discarding items in a queue in a non-reproduceable manner isn't the sort of thing I would suggest anyone do.Carbonization
It's not how I would do it either. I'd rather store everything in a database and explicitly purge old records. That would have the added benefit of allowing for more complex queries that might be needed in the future...Slobbery
Disruptor library which is the most sophisticated/complex ring buffer in Java. That's a point of view I presume. The guy doesn't need handoff at all, so disruptor is not fit for the case. Even w/ hand-off flat-combining will outperform producer contended disruptor (it may not be strict FIFO amongst the producers)Dungdungan
@Dungdungan I have qualified that statement. I agree its not the right solution in every case.Carbonization
H
1

Also have a look at java.util.concurrent.

Blocking queues will block until there is something to consume or (optionally) space to produce:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

Concurrent linked queue is non-blocking and uses a slick algorithm that allows a producer and consumer to be active concurrently:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Horsemanship answered 21/6, 2012 at 17:22 Comment(0)
M
1

Hazelcast's Queue offers almost everything you ask for, but doesn't support circularity. But from your description I am not sure if you actually need it.

Musket answered 25/6, 2012 at 10:22 Comment(0)
U
0

If it were me, I would use the CircularFIFOBuffer as you indicated, and synchronize around the buffer when writing (add). When the monitoring application wants to read the buffer, synchronize on the buffer, and then copy or clone it to use for reporting.

This suggestion is predicated on the assumption that latency is minimal to copy/clone the buffer to a new object. If there are large number of elements, and copying time is slow, then this is not a good idea.

Pseudo-Code example:

public void writeRequest(String requestID) {
    synchronized(buffer) {
       buffer.add(requestID);
    }
}

public Collection<String> getRequests() {
     synchronized(buffer) {
        return buffer.clone();
     }
}
Unstrap answered 27/6, 2012 at 17:22 Comment(0)
D
0

Since you specifically ask to give writers (that is web servers) higher priority than the reader (that is monitoring), I would suggest the following design.

Web servers add request information to a concurrent queue which is read by a dedicated thread, which adds requests to a thread-local (therefore non-synchronized) queue that overwrites the oldest element, like EvictingQueue or CircularFifoQueue. This same thread checks a flag which indicates if a report has been requested after every request processed, and if positive, produces a report from all elements present in the thread-local queue.

Diplomat answered 28/5, 2020 at 19:37 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.