I would like to know the optimal number of threads I can run. Normally, this equals to Runtime.getRuntime().availableProcessors()
.
However, the returned number is twice as high on a CPU supporting hyper threading. Now, for some tasks hyper threading is good, but for others it does nothing. In my case, I suspect, it does nothing and so I wish to know whether I have to divide the number returned by Runtime.getRuntime().availableProcessors()
in two.
For that I have to deduce whether the CPU is hyper threading. Hence my question - how can I do it in Java?
Thanks.
EDIT
OK, I have benchmarked my code. Here is my environment:
- Lenovo ThinkPad W510 (i.e. i7 CPU with 4 cores and hyperthreading), 16G of RAM
- Windows 7
- 84 zipped CSV files with zipped sizes ranging from 105M to 16M
- All the files are read one by one in the main thread - no multithreading access to the HD.
- Each CSV file row contains some data, which is parsed and a fast context-free test determines whether the row is relevant.
- Each relevant row contains two doubles (representing longitude and latitude, for the curious), which are coerced into a single
Long
, which is then stored in a shared hash set.
Thus the worker threads do not read anything from the HD, but they do occupy themselves with unzipping and parsing the contents (using the opencsv library).
Below is the code, w/o the boring details:
public void work(File dir) throws IOException, InterruptedException {
Set<Long> allCoordinates = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
int n = 6;
// NO WAITING QUEUE !
ThreadPoolExecutor exec = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
StopWatch sw1 = new StopWatch();
StopWatch sw2 = new StopWatch();
sw1.start();
sw2.start();
sw2.suspend();
for (WorkItem wi : m_workItems) {
for (File file : dir.listFiles(wi.fileNameFilter)) {
MyTask task;
try {
sw2.resume();
// The only reading from the HD occurs here:
task = new MyTask(file, m_coordinateCollector, allCoordinates, wi.headerClass, wi.rowClass);
sw2.suspend();
} catch (IOException exc) {
System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));
continue;
}
boolean retry = true;
while (retry) {
int count = exec.getActiveCount();
try {
// Fails if the maximum of the worker threads was created and all are busy.
// This prevents us from loading all the files in memory and getting the OOM exception.
exec.submit(task);
retry = false;
} catch (RejectedExecutionException exc) {
// Wait for any worker thread to finish
while (exec.getActiveCount() == count) {
Thread.sleep(100);
}
}
}
}
}
exec.shutdown();
exec.awaitTermination(1, TimeUnit.HOURS);
sw1.stop();
sw2.stop();
System.out.println(String.format("Max concurrent threads = %d", n));
System.out.println(String.format("Total file count = %d", m_stats.getFileCount()));
System.out.println(String.format("Total lines = %d", m_stats.getTotalLineCount()));
System.out.println(String.format("Total good lines = %d", m_stats.getGoodLineCount()));
System.out.println(String.format("Total coordinates = %d", allCoordinates.size()));
System.out.println(String.format("Overall elapsed time = %d sec, excluding I/O = %d sec", sw1.getTime() / 1000, (sw1.getTime() - sw2.getTime()) / 1000));
}
public class MyTask<H extends CsvFileHeader, R extends CsvFileRow<H>> implements Runnable {
private final byte[] m_buffer;
private final String m_name;
private final CoordinateCollector m_coordinateCollector;
private final Set<Long> m_allCoordinates;
private final Class<H> m_headerClass;
private final Class<R> m_rowClass;
public MyTask(File file, CoordinateCollector coordinateCollector, Set<Long> allCoordinates,
Class<H> headerClass, Class<R> rowClass) throws IOException {
m_coordinateCollector = coordinateCollector;
m_allCoordinates = allCoordinates;
m_headerClass = headerClass;
m_rowClass = rowClass;
m_name = file.getName();
m_buffer = Files.toByteArray(file);
}
@Override
public void run() {
try {
m_coordinateCollector.collect(m_name, m_buffer, m_allCoordinates, m_headerClass, m_rowClass);
} catch (IOException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
Please, find below the results (I have slightly changed the output to omit the repeating parts):
Max concurrent threads = 4
Total file count = 84
Total lines = 56395333
Total good lines = 35119231
Total coordinates = 987045
Overall elapsed time = 274 sec, excluding I/O = 266 sec
Max concurrent threads = 6
Overall elapsed time = 218 sec, excluding I/O = 209 sec
Max concurrent threads = 7
Overall elapsed time = 209 sec, excluding I/O = 199 sec
Max concurrent threads = 8
Overall elapsed time = 201 sec, excluding I/O = 192 sec
Max concurrent threads = 9
Overall elapsed time = 198 sec, excluding I/O = 186 sec
You are free to draw your own conclusions, but mine is that hyperthreading does improve the performance in my concrete case. Also, having 6 worker threads seems to be the right choice for this task and my machine.
T/2
threads each end up on different cores, rather than using just half of the cores and still incurring the HT overheads? – Spinks