How to quickly compare two text files and get unique rows?
Asked Answered
S

4

8

I have 2 text files (*.txt) that contain unique strings in the format:

udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi

The first file contains 50 million such lines (4.3 GB), and the second contains 1 million lines (112 MB). One line contains 40 characters, delimiter : and 45 more characters.

Task: get unique values for both files. That is, you need a csv or txt file with lines that are in the second file and which are not in the first.

I am trying to do this using vaex (Vaex):

import vaex

base_files = ['file1.txt']
for i, txt_file in enumerate(base_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')

check_files = ['file2.txt']
for i, txt_file in enumerate(check_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')

dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base, on='data', how='inner', inplace=True)
dv_result.export(path='result.csv')

As a result, I get the result.csv file with unique row values. But the verification process takes a very long time. In addition, it uses all available RAM and all processor resources. How can this process be accelerated? What am I doing wrong? What can be done better? Is it worth using other libraries (pandas, dask) for this check and will they be faster?


UPD 10.11.2020 So far, I have not found anything faster than the following option:

from io import StringIO


def read_lines(filename):
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')


def read_in_chunks(file_obj, chunk_size=10485760):
    while True:
        data = file_obj.read(chunk_size)
        if not data:
            break
        yield data


file_check = open('check.txt', 'r', errors='ignore').read()

check_set = {elem for elem in read_lines(file_check)}

with open(file='base.txt', mode='r', errors='ignore') as file_base:
    for idx, chunk in enumerate(read_in_chunks(file_base), 1):
        print(f'Checked [{idx}0 Mb]')
        for elem in read_lines(chunk):
            if elem in check_set:
                check_set.remove(elem)

print(f'Unique rows: [{len(check_set)}]')

UPD 11.11.2020: Thanks @m9_psy for the tips to improve performance. It's really faster! Currently, the fastest way is:

from io import BytesIO

check_set = {elem for elem in BytesIO(open('check.txt', 'rb').read())}

with open('base.txt', 'rb') as file_base:
    for line in file_base:
        if line in check_set:
            check_set.remove(line)

print(f'Unique rows: [{len(check_set)}]')

Is there a way to further speed up this process?

Salts answered 1/11, 2020 at 9:28 Comment(9)
Could you elaborate a bit on which process takes a very long time? Is it the conversion from csv to hdf5? Of is it the join that is slow and uses memory?Expect
@Maarten Breddels Conversion to hdf5 is fast. Checking strings is slow.Unexampled
By checking you mean the join?Expect
@Maarten Breddels Not really. I just need to get the lines that are in the second file and that are not in the first file. The second file is always smaller than the first.Unexampled
Have you considered the alternative of simply using awk on the command line? If I understood your requirements correctly (i.e. return only lines which are present in file2.txt, but not already in file1.txt), this answer should do the job just fine. Note that you need to pipe the result to file, i.e. awk 'FNR==NR {a[$0]++; next} !($0 in a)' file1.txt file2.txt > result.txtIlluminant
Stuff that should work: Sort the data beforehan and/or index the data to be able to join it without doing a full table scan. Compressing the data is also a good idea (less io/memory operations) columnar storage is a great option.Rotl
There is a way to speedup fastest StringIO version - 1) Use BytesIO and 'rb' read avoiding string decoding and operating raw bytes. Funny thing, that bytesio is faster than "filename.split() - that fact puts some questions. 2) Get rid of generator in "read_lines" - return just "handle" - you will get rid of extra generator calls. 3) Questionable measure, but you can get rid of "strip" methods as well - it will be ok for counting, because all strings (not just some) will have "/n" at the endMirilla
@Mirilla Thanks for the tips to improve performance. It's really faster! I updated my question, added a new version of the code.Unexampled
This project does exactly this github.com/nil0x42/duplicutPorbeagle
M
8

I have a suspicion that the join operation requires n * m comparison operations where n and m are the length of the two dataframes.

Also, there's an inconsistency between your description and your code:

  • "That is, you need a csv or txt file with lines that are in the second file and which are not in the first." ⟶ this means in dv_check but not in dv_base
  • dv_check.join(dv_base, on='data', how='inner', inplace=True) ⟶ this means in both dv_check and dv_base

Anyhow, an idea is to use set since checking for membership in a set has a time complexity of O(1) while checking membership in a list has a complexity of O(n). If you are familiar with the SQL world, this is equivalent to moving from a LOOP JOIN strategy to a HASH JOIN strategy:

# This will take care of removing the duplicates
base_set = set(dv_base['data'])
check_set = set(dv_check['data'])

# In `dv_check` but not `dv_base`
keys = check_set - base_set

# In both `dv_check` and `dv_base`
keys = check_set & base_set

This only gives you the keys that satisfy your condition. You still have to filter the two dataframes to get the other attributes.

Finished in 1 minute and 14 seconds on my 2014 iMac with 16GB of RAM.

Macrae answered 1/11, 2020 at 23:31 Comment(3)
I have error: TypeError: list indices must be integers or slices, not str on row: base_set = set(dv_base['data']). Screen: prnt.sc/vc0quwUnexampled
Replaced base_set = set(dv_base['data']) with base_set = set(dv_base['data'].unique()) and it worked. Is it possible to somehow limit the memory usage in my script? For example, for the case of checking files larger than 100 GB.Unexampled
You can load dv_base in small chunks (say, 10M rows) at a time. You still have not resolved the inconsistency I mentioned aboveMacrae
E
6

Let's generate a dataset to mimic your example

import vaex
import numpy as np
N = 50_000_000  # 50 million rows for base
N2 = 1_000_000  # 1 million for check
M = 40+1+45     # chars for each string
N_dup = 10_000  # number of duplicate rows in the checks

s1 = np.random.randint(ord('a'), ord('z'), (N, M), np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'), ord('z'), (N2, M), np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2, N_dup, replace=False)
s2[dups] = s1[np.random.choice(N, N_dup, replace=False)]

# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')

Now, to find the rows in check, that are not in base, we can join them, and drop the rows that did not match:

import vaex
base = vaex.open('/data/tmp/base.hdf5')
check = vaex.open('/data/tmp/check.hdf5')
# joined contains rows where s_other is missing
joined = check.join(base, on='s', how='left', rsuffix='_other')
# drop those
unique = joined.dropmissing(['s_other'])
# and we have everything left
unique
#      s                                                    s_other
0      'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfh...  'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfhb...
1      'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikh...  'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikhh...
2      'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhact...  'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhactn...
3      'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjk...  'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjku...
4      'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvb...  'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvbs...
...    ...                                                  ...
9,995  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwn...  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwno...
9,996  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogms...  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogmsb...
9,997  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknji...  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknjii...
9,998  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmv...  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmvo...
9,999  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfeb...  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfebi...
Expect answered 6/11, 2020 at 11:39 Comment(0)
W
4

Here is another approach. The check file is about 0.1 GB (fits in memory). The base file is up to 100 GB (so process a line at a time).

Create test data and generator function to import data

from io import StringIO

# test data for base (>50 million lines)
base_file = '''a
b
c
d
e
'''

# test data for check (1 million lines)
check_file = '''d
e
f
g
'''

def read_lines(filename):
    ''' Read data file one line at a time (with generator function).'''
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')

Find elements in the check file only (check_set - base_set in @CodeDifferent's example)

check_set = {elem for elem in read_lines(check_file)}

for elem in read_lines(base_file):
    if elem in check_set:
        check_set.remove(elem)
print(check_set)
{'g', 'f'}

Find intersection (check_set & base_set in @CodeDifferent's example)

check_set = {elem for elem in read_lines(check_file)}

common_elements = set()
for elem in read_lines(base_file):
    if elem in check_set:
        common_elements.add(elem)
print(common_elements)
{'d', 'e'}

I think this approach would work best when (a) base file is much larger than check file and (b) base file is too big for in-memory data structure.

Webbed answered 5/11, 2020 at 17:23 Comment(0)
S
3

Note! My original answer is wrong. @codedifferent is right. Here is my slightly different version. This may help somebody. I assume that the text file only contains one column.

import pandas as pd

filepath_check = './data/checkfile.csv'
filepath_base = './data/basefile.csv'

# load the small data into memory
dfcheck = pd.read_csv(filepath_check)
dfcheck = set(dfcheck['data'])

# but load the big data in chunk
chunk_iter = pd.read_csv(filepath_base, chunksize=100000)

# for each chunk, remove intersect if any.
for chunk in chunk_iter:
    dfcheck = dfcheck - set(chunk['data'])
    print(len(dfcheck))

# write result
with open('./results.txt', 'w') as filehandler:
    for item in dfcheck:
        filehandler.write('%s\n'% item)

old answer

I faced a similar problem now. My solution is to use Dask, but sure, Vaex should be fine.

import dask.dataframe as dd

base_file = dd.read_csv('./base_file.csv')
check_file = dd.read_csv('./check_file.csv')

base_file = base_file.set_index('data')
check_file = check_file.set_index('data')

base_file.to_parquet('./results/base_file', compression=None)
check_file.to_parquet('./results/base_file', compression=None)

base_file.read_parquet('./results/base_file')
check_file.read_parquet('./results/check_file')
merged = dd.merge(base_file, check_file, left_index=True, right_index=True)

# save to csv from dask dataframe
merged.to_csv('/results/dask_result.csv', single_file = True)

# or save to csv from pandas dataframe
pandas_merged = merged.compute() # convert to pandas
pandas_merged.to_csv(...)
  1. Why set_index? It makes the Join process faster. https://docs.dask.org/en/latest/dataframe-best-practices.html#joins
  2. Why save to parquet just to read from it afterward? In my experience, even with Dask, directly read CSV takes more memory. The loading parquet file is definitely faster. https://docs.dask.org/en/latest/dataframe-best-practices.html#store-data-in-apache-parquet-format. I have many save/load lines to save the result after exhaustive processes such as join and set_index.
  3. if check_file is small enough, you can load the whole check_file into memory with check_file = check_file.persist() after loading the file or wherever necessary.
Shebeen answered 9/11, 2020 at 23:41 Comment(6)
Your way keeps duplicate lines: merged stores the lines that are in both files. And I need unique lines: those that are in the second file, but which are not in the first file. check_file - merged Unexampled
UPD: this work merged = base_file.merge(check_file, how='outer', indicator=True).loc[lambda x: x['_merge'] == 'right_only']Unexampled
Glad if something works for you. Pardon my wrong answer. But the @codedifferent answer is right and efficient. Is the text file only contains one column?Shebeen
Yes, contains one column. I am looking for any way to get unique strings from 2 text files quickly. It doesn't have to be pandas, dask or vaex. Do you know more ways?Unexampled
I updated my question to add the fastest (currently) way to compare two files.Unexampled
This is a good decision. But not fast enough. The method I added to my question (StringIO) is about twice as fast.Unexampled

© 2022 - 2024 — McMap. All rights reserved.