Using ArrayBlockingQueue makes the process slower
Asked Answered
B

2

6

I just recently used ArrayBlockingQueue for my multi-thread process. But it seemed like it slowed down rather than speeding up. Can you guys help me out? I'm basically importing a file (about 300k rows) and parsing them and storing them in the DB

public class CellPool {
private static class RejectedHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
      System.err.println(Thread.currentThread().getName() + " execution rejected: " + arg0);     
    }
  }

  private static class Task implements Runnable {
    private JSONObject obj;

    public Task(JSONObject obj) {
      this.obj = obj;
    }

    @Override
    public void run() {
      try {
        Thread.sleep(1);
        runThis(obj);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    public void runThis(JSONObject obj) {
        //where the rows are parsed and stored in the DB, etc
    }
  }

  public static void executeCellPool(String filename) throws InterruptedException {
    // fixed pool fixed queue
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(300000, true);
    ThreadPoolExecutor executor = new ThreadPoolExecutor(90, 100, 1, TimeUnit.MINUTES, queue);

    DataSet ds = CommonDelimitedParser.getDataSet(filename);
    final String[] colNames = ds.getColumns();
    while (ds.next()) {
        JSONObject obj = new JSONObject();
        //some JSON object
        Task t = new Task(obj);
        executor.execute(t);
    }
  }

}

Boarder answered 15/12, 2015 at 6:36 Comment(4)
Slowed down relative to what? Using another type of BlockingQueue?Apollinaire
Please get rid of the sleep.Intracellular
Why? I suggest you get rid of the queue, the threads, the executor, all that, and do it all in a single thread, as a batch. You don't need a queue of 30,000 items and 90-100 threads for this.Grier
Thread.sleep(1); makes no sense in your code. You should always avoid using sleep if you can.Bayreuth
D
4

tl;dr Big queue sizes can have a negative impact, as can large thread counts. Ideally, you want your consumers and producers to be working at a similar rate.

The reason the addition of the queue is causing issues is because you're using a very large queue (which is not necessary) that is taking up resources. Typically, a blocking queue blocks producers when there is no space left in the queue and consumers when there are no objects left in the queue. By creating a such a large one of a static size, Java is assigning that space in memory when you almost certainly aren't using all of it. It would be more effective to force your producer to wait for space in the queue to clear up if your consumers are consumers too slowly. You don't need to store all of the lines from your file in the queue at the same time.

Thread Pool Executor Queues are discussed in the javadoc here.

Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.

Your large thread size of 90, combined with your very large pool size of 300000, is most likely using a lot of memory, and resulting in additional thread scheduling overhead. I would drop both of them considerably. I don't know what hardware you are running on, but since it sounds like you're writing an IO intensive program, I would try double the number of threads your CPU can handle, and play around with sizes for your blocking queue to see what works (note: I haven't researched this, this is based on my experience running queues and executors. Happy for others to suggest a different count!).

Of note, though, is that the execute() method will throw a RejectedExecutionException on failure to add to the queue if your queue is too small. One way of monitoring the queue would be to check it's capacity before scheduling a task. You can do this by calling:

executor.getQueue().remainingCapacity()

Don't use the executor.getQueue() method to alter the queue in any way, but it can be used for monitoring.

An alternative is to use an unbounded queue, such as a LinkedBlockingQueue without a defined capacity. This way, you won't need to deal with queue sizes. However, if your producers are running much faster than your consumers, you will once again have the issue of consuming too much memory.

Also, kostya is right, a JDBC batch insert would be faster.

Dybbuk answered 15/12, 2015 at 10:3 Comment(1)
Thanks, AndyN and @kostya for this! I'm quite new in java. I tried batch insert, it was significantly faster. :)Boarder
K
2

If you want to persist records from a file into a relational database as fast as possible you should use JDBC batch insert rather than inserting records one by one.

Kneedeep answered 15/12, 2015 at 7:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.