I am new to Spark and MLlib and I'm trying to call StreamingKMeans from my java application and I get an exception that I don't seem to understand. Here is my code for transforming my training data:
JavaDStream<Vector> trainingData = sjsc.textFileStream("/training")
.map(new Function<String, Vector>() {
public DenseVector call(String line) throws Exception {
String[] lineSplit = line.split(",");
double[] doubleValues = new double[lineSplit.length];
for (int i = 0; i < lineSplit.length; i++) {
doubleValues[i] = Double.parseDouble(lineSplit[i] != null ? !""
.equals(lineSplit[i]) ? lineSplit[i] : "0" : "0");
}
DenseVector denseV = new DenseVector(doubleValues);
if (denseV.size() != 16) {
throw new Exception("All vectors are not the same size!");
}
System.out.println("Vector length is:" + denseV.size());
return denseV;
}
});
Here the code where I call the trainOn method:
int numDimensions = 18;
int numClusters = 2;
StreamingKMeans model = new StreamingKMeans();
model.setK(numClusters);
model.setDecayFactor(.5);
model.setRandomCenters(numDimensions, 0.0, Utils.random().nextLong());
model.trainOn(trainingData.dstream());
And here is the exception I receive:
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:292)
at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:485)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:459)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:453)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:453)
at org.apache.spark.mllib.clustering.KMeansModel.predict(KMeansModel.scala:35)
at org.apache.spark.mllib.clustering.StreamingKMeans$$anonfun$predictOnValues$1.apply(StreamingKMeans.scala:258)
at org.apache.spark.mllib.clustering.StreamingKMeans$$anonfun$predictOnValues$1.apply(StreamingKMeans.scala:258)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$15.apply(PairRDDFunctions.scala:674)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$15.apply(PairRDDFunctions.scala:674)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
at org.apache.spark.rdd.RDD$$anonfun$33.apply(RDD.scala:1177)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
As you can see in the above code I am checking to make sure my vectors are all the same size and they appear to be, even though the error is suggesting they are not. Any help would be greatly appreciated!