How to compute the distance matrix in spark?
Asked Answered
S

3

10

I have tried pairing the samples but it costs huge amount of memory as 100 samples leads to 9900 samples which is more costly. What could be the more effective way of computing distance matrix in distributed environment in spark

Here is a snippet of pseudo code what i'm trying

val input = (sc.textFile("AirPassengers.csv",(numPartitions/2)))
val i = input.map(s => (Vectors.dense(s.split(',').map(_.toDouble))))
val indexed = i.zipWithIndex()                                                                       //Including the index of each sample
val indexedData = indexed.map{case (k,v) => (v,k)}

val pairedSamples = indexedData.cartesian(indexedData)

val filteredSamples = pairedSamples.filter{ case (x,y) =>
(x._1.toInt > y._1.toInt)  //to consider only the upper or lower trainagle
 }
filteredSamples.cache
filteredSamples.count

Above code creates the pairs but even if my dataset contains 100 samples, by pairing filteredSamples (above) results in 4950 sample which could be very costly for big data

Scorify answered 14/6, 2016 at 10:47 Comment(2)
Please post any code examples of what you've tried so far, the data or sample data that relates to your question, and any libraries or resources you have tried.Betrothed
I have added the snippet of the code. Hope it helps you what my problem isScorify
H
5

I recently answered a similar question.

Basically, it will arrive to computing n(n-1)/2 pairs, which would be 4950 computations in your example. However, what makes this approach different is that I use joins instead of cartesian. With your code, the solution would look like this:

val input = (sc.textFile("AirPassengers.csv",(numPartitions/2)))
val i = input.map(s => (Vectors.dense(s.split(',').map(_.toDouble))))
val indexed = i.zipWithIndex()

// including the index of each sample
val indexedData = indexed.map { case (k,v) => (v,k) } 

// prepare indices
val count = i.count
val indices = sc.parallelize(for(i <- 0L until count; j <- 0L until count; if i > j) yield (i, j))

val joined1 = indices.join(indexedData).map { case (i, (j, v)) => (j, (i,v)) }
val joined2 = joined1.join(indexedData).map { case (j, ((i,v1),v2)) => ((i,j),(v1,v2)) }

// after that, you can then compute the distance using your distFunc
val distRDD = joined2.mapValues{ case (v1, v2) => distFunc(v1, v2) }

Try this method and compare it with the one you already posted. Hopefully, this can speedup your code a bit.

Hilda answered 9/8, 2016 at 14:21 Comment(0)
C
1

As far as I can see from checking various sources and the Spark mllib clustering site, Spark doesn't currently support the distance or pdist matrices.

In my opinion, 100 samples will always output at least 4950 values; so manually creating a distributed matrix solver using a transformation (like .map) would be the best solution.

Clypeate answered 9/8, 2016 at 13:55 Comment(0)
P
0

This can serve as the java version of jtitusj's answer..

public JavaPairRDD<Tuple2<Long, Long>, Double> getDistanceMatrix(Dataset<Row> ds, String vectorCol) {

    JavaRDD<Vector> rdd = ds.toJavaRDD().map(new Function<Row, Vector>() {

        private static final long serialVersionUID = 1L;

        public Vector call(Row row) throws Exception {
            return row.getAs(vectorCol);
        }

    });

    List<Vector> vectors = rdd.collect();

    long count = ds.count();

    List<Tuple2<Tuple2<Long, Long>, Double>> distanceList = new ArrayList<Tuple2<Tuple2<Long, Long>, Double>>();

    for(long i=0; i < count; i++) {
        for(long j=0; j < count && i > j; j++) {
            Tuple2<Long, Long> indexPair = new Tuple2<Long, Long>(i, j);
            double d = DistanceMeasure.getDistance(vectors.get((int)i), vectors.get((int)j));
            distanceList.add(new Tuple2<Tuple2<Long, Long>, Double>(indexPair, d));
        }
    }

    return distanceList;
}
Prevail answered 31/5, 2019 at 5:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.