Parallel Cosine similarity of two large files with each other
Asked Answered
G

1

2

I have two files: A and B

A has 400,000 lines each having 50 float values
B has 40,000 lines having 50 float values.

For every line in B, I need to find corresponding lines in A which have >90% similarity (cosine).

For linear search and computation, the code takes ginormous computing time. (40-50 hours)

Reaching out to the community for suggestions on how to fasten the process (link of blogs/resources such as AWS/Cloud to be used to achieve it). Have been stuck with this for quite a while!

[There were mentions of rpud/rpudplus to do it, but can't seem to perform them on cloud resources]

N.B. as requested, the code for cosine similarity is:

for line1, line2 in zip(f1, f2):
    line1 = line1[:-1]
    cnt = cnt + 1
    l2X = [float(i) for i in line2.split()]
    f3 = open(enLabelValues, 'r')
    f4 = open(enVectorValues, 'r')
    print cnt
    cnt_j = 0
    for line3, line4 in zip(f3, f4):
        line3 = line3[:-1]
        l4X = [float(i) for i in line4.split()]
        ########This is the line for spatial cosine similarity
        result = 1 - spatial.distance.cosine(l2X, l4X)
        cnt_j = cnt_j + 1
        if(result > float(0.95)):
            if line3 not in a.keys():
                a[line3] = True
                fLabel_2.write(line3+"\n")
                fX_2.write(line4)
        fLabel_2.flush()
        fX_2.flush()
        os.fsync(fLabel_2.fileno())
        os.fsync(fX_2.fileno())
Greasepaint answered 4/12, 2017 at 1:52 Comment(7)
If downvoting, please share why it is being downvoted. It is a genuine question I have been working on. Let's not be bitter and be constructiveGreasepaint
Where is the code to measure the similarity for 1 line please?Somersault
What sort of a machine do you have available for this?Somersault
@MarkSetchell added the code. I was using CPU 8GB, but have GPU accesses as well. Not sure how to go about parallelizing them. (Since every line in A is to be compared with B, we could in theory parallelize by duplicating B and having parts of A compared with the replicated files for B)Greasepaint
Yikes! I'm not a bit surprised it's taking 40+ hours. Your code isn't at all parallelised, you are using Python and you aren't using SIMD. Also, although this would be more relevant for a parallelised version, you don't pre-compute the magnitudes just once nor do you write the files as binary. And yes, do run multiple threads/processes where each one loads the smaller (B) of the datasets into memory and checks a subset of A against it.Somersault
@MarkSetchell Yes, I figured I should go for GPU codes. So, reached out to the community before re-inventing the wheel for established solutions. Was trying out rpud/rpudplus codes, but they are apparently not readily available in the cloud solutions! Will add details when I figure out the slicker approach!Greasepaint
Python is horrible for doing mass computations. What it is good at is controlling libraries that do mass computations. So see if you can express your problem using matrices and then use TensorFlow, NumPy, or similar tools. #43358232 looks relevant.Appomattox
S
3

I can generate synthetic files of 40,000 and 400,000 lines with 50 samples per line and process them in around 2 minutes 18 seconds on a reasonable 4 core (+hyperthreading) desktop iMac in my clumsy style of C++ without any SIMD optimisation (by me) using GNU Parallel.

Here is the top-level script. You can see it generates the test data in "a.txt" and "b.txt". Then it "compresses" "b.txt" to an identical binary representation, with the pre-computed magnitude appended to each line. Finally, it numbers the lines in "a.txt" and passes them into GNU Parallel which splits the lines into groups of around 5,200 lines and starts a group of 8 parallel processes to compare each of those lines with the 40,000 lines in B.

#!/bin/bash

# Generate test data - a.txt b.txt
./generate

# Preprocess b.txt into binary with precomputed magitudes save as B
./preprocess

# Process file A in batches
cat -n a.txt | parallel --block-size 2M --line-buffer --pipe ./process {#}

Here is the generate.cpp program to synthesise data:

#include <iostream>
#include <cstdlib>
#include <fstream>
#include "common.h"

using namespace std;

int main()
{
   int line,sample;
   ofstream a("a.txt");
   if (!a.is_open()){
      cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }
   for(line=0;line<ALINES;line++){
      for(sample=0;sample<SAMPLESPERLINE;sample++){
         a << (float)rand()*100/RAND_MAX << " ";
      }
      a << endl;
   }
   a.close();
   ofstream b("b.txt");
   if (!b.is_open()){
      cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }
   for(line=0;line<BLINES;line++){
      for(sample=0;sample<SAMPLESPERLINE;sample++){
         b << (float)rand()*100/RAND_MAX << " ";
      }
      b << endl;
   }
   b.close();
}

Here is the preprocess.cpp code:

#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <cmath>
#include "common.h"

int main(int argc, char* argv[]){

   std::ifstream btxt("b.txt");
   std::ofstream bbin("B",std::ios::out|std::ios::binary);
   if (!btxt.is_open()){
      std::cerr << "ERROR: Unable to open input file";
      exit(EXIT_FAILURE);
   }
   if (!bbin.is_open()){
      std::cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }

   int l=0;
   std::string line;
   std::vector<float> v;
   v.resize(SAMPLESPERLINE+1);
   while (std::getline(btxt,line)){
      std::istringstream iss(line);
      v.clear();
      float f;
      double magnitude;
      magnitude=0.0;
      int s=0;
      while (iss >> f){
         v[s]=(f);
         magnitude+=(double)f*f;
         s++;
      }
      // Append the magnitude to the end of the "line"
      v[s]=(float)sqrt(magnitude);
      // Write the samples and magnitide in binary to the output file
      bbin.write(reinterpret_cast<char*>(&v[0]),(SAMPLESPERLINE+1)*sizeof(float));
      l++;
   }
   btxt.close();
   bbin.close();

   return EXIT_SUCCESS;
}

Here is the common.h file:

const int ALINES=400000;
const int BLINES=40000;
const int SAMPLESPERLINE=50;

And here is the process.cpp code:

#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <array>
#include <cmath>
#include "common.h"

int main(int argc, char* argv[]){

   if(argc!=2){
      std::cerr << "Usage: process JOBNUM" << std::endl;
      exit(1);
   }
   int JobNum=std::atoi(argv[1]);
   std::cerr << "Starting job: " << JobNum << std::endl;

   // Load B
   std::ifstream bbin("B",std::ios::binary);
   if (!bbin.is_open()){
      std::cerr << "ERROR: Unable to open B";
      exit(EXIT_FAILURE);
   }

   int l=0;
   std::array<float,SAMPLESPERLINE+1> record;
   std::vector<std::array<float,SAMPLESPERLINE+1>> B;
   B.resize(BLINES);
   for(l=0;l<BLINES;l++){
      // Read one record of 50 floats and their magnitude
      bbin.read(reinterpret_cast<char*>(&B[l][0]),sizeof(float)*(SAMPLESPERLINE+1));
   }
   bbin.close();

   // Process all lines read from stdin, each line prepended by its line number
   // Format is:
   // <line number in file "a.txt"> <SAMPLE0> <SAMPLE1> ... <SAMPLE49>
   int nLines=0;
   std::string line;
   while (std::getline(std::cin,line)){
      nLines++;
      std::istringstream iss(line);
      std::vector<float> A;
      A.resize(SAMPLESPERLINE);
      float f;
      int Alineno;
      int s=0;
      iss >> Alineno;
      double dMag=0.0;
      while (iss >> f){
         A[s++]=f;
         dMag+=(double)f*f;
      }
      // Root magnitude
      float AMagnitude=(float)sqrt(dMag);

      // At this point we have in B, 40,000 records each of 50 samples followed by the magnitude
      // ... and we have a single record from "a.txt" with 50 samples and its magnitude in AMagnitude
      // ... and Alineno is the absolute line number in "a.txt" of this line
      // Time to do the actual calculation: compare this record to all records in B
      for(int brec=0;brec<BLINES;brec++){
         float BMagnitude=B[brec][SAMPLESPERLINE];
         double dotproduct=0.0;
         float *a = &A[0];
         float *b = &B[brec][0];
         for(s=0;s<SAMPLESPERLINE;s++){
            dotproduct += (*a++) * (*b++);
         }
         float similarity = dotproduct/(AMagnitude*BMagnitude);
         if(similarity>0.99){
            std::cout << "Line A: " << Alineno << ", line B: " << brec << ", similarity:" << similarity << std::endl;
         }
      }
   }
   std::cerr << "Ending job: " << JobNum << ", processed " << nLines << " lines" << std::endl;

   return EXIT_SUCCESS;
}

The Makefile is pretty simple:

CFLAGS= -std=c++11 -O3 -march=native

all:    generate preprocess process

generate:   generate.cpp
        clang++ ${CFLAGS} generate.cpp -o generate

preprocess: preprocess.cpp
        clang++ ${CFLAGS} preprocess.cpp -o preprocess

process:    process.cpp
        clang++ ${CFLAGS} process.cpp -o process

When you run it, it pegs the CPU for 2 minutes and looks like this:

time ./go
Starting job: 3
Starting job: 7
Starting job: 8
Starting job: 2
Starting job: 5
Starting job: 1
Starting job: 4
Starting job: 6
Ending job: 1, processed 5204 lines
Starting job: 9
Ending job: 2, processed 5203 lines
Ending job: 3, processed 5204 lines
Starting job: 11
Starting job: 10
Ending job: 4, processed 5204 lines
Starting job: 12
Ending job: 5, processed 5203 lines
Ending job: 6, processed 5203 lines
Starting job: 14
Starting job: 13
...
...
Starting job: 75
Ending job: 68, processed 5204 lines
Ending job: 69, processed 5203 lines
Starting job: 76
Starting job: 77
Ending job: 70, processed 5203 lines
Ending job: 71, processed 5204 lines
Ending job: 72, processed 5203 lines
Ending job: 77, processed 4535 lines
Ending job: 74, processed 5204 lines
Ending job: 73, processed 5205 lines
Ending job: 75, processed 5204 lines
Ending job: 76, processed 5203 lines

real    2m17.510s
user    16m24.533s
sys     0m4.426s

Note that I have not done any explicit SIMD or loop-unrolling or used any intrinsics to form the dot-product. I suspect if you asked a question about forming a dot-product and tagged it with simd or avx, someone would help you optimise it.


Note also that you could easily run this code across multiple computers with GNU Parallel, assuming you have ssh login to them, just using:

parallel -S host1,host2,host3 ....

For example, I have a 6-core Debian PC on my network, so I ran the above code parallelised across my 4-core Mac and 6-core Debian machine with:

parallel -S :,debian ...

and it then takes 1 minute 8 seconds.

Somersault answered 5/12, 2017 at 12:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.