I'm trying to demonstrate a simple clustering algorithm. Let's say I find k items on my rdd that represent a cluster for me. The problem is when I try to take() them to the driver, I get a warning saying " block locks were not released by tid ". This also leads to poor performance... 20 iterations of the code below take 11 seconds on 4-cores with a 2MB dataset. That is way worse than my serial version. Good thing is that I still get the correct results. Take a look please:
First, I take one item. Any item will do
List<Tuple2<Long, Instance>> selectedTuple = dataset.take(1);
Instance selected = selectedTuple.get(0)._2;
Then I create an object that will find candidate items for my cluster based on the selected item I just found. For the record, it implements PairFlatMapFunction
NCPComparator comp = new NCPComparator(meta, selected);
The next step finds some candidates
JavaPairRDD<Long, Double> candidates = this.dataset.mapPartitionsToPair(comp);
But now, I need the k one's with the lowest Double value, so I implemented a TupleComparator
. Then I invoke takeOrdered
List<Tuple2<Long, Double>> groupList = candidates.takeOrdered(k, new TupleComparator());
At this point, that warning appears. Same goes for invoking top()
. And as an iterative algorithm, it appears in every iteration. I am not showing you the rest of the code. You guessed it.. it's more than trivial :-)
Also, I did proper persist my rdds. Jobs seem to be working like a charm. I found out that lots of people have this problem but there is no answer.